You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Sam Danbury (Jira)" <ji...@apache.org> on 2019/09/26 13:02:00 UTC

[jira] [Created] (AIRFLOW-5559) Kubernetes Executor operator-level executor_config override applies to all operators in a DAG

Sam Danbury created AIRFLOW-5559:
------------------------------------

             Summary: Kubernetes Executor operator-level executor_config override applies to all operators in a DAG
                 Key: AIRFLOW-5559
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5559
             Project: Apache Airflow
          Issue Type: Bug
          Components: executor-kubernetes
    Affects Versions: 1.10.4
            Reporter: Sam Danbury
            Assignee: Daniel Imberman


When applying requests and limits to the worker pods spun up by the Kubernetes Executor (using the executor_config argument), the last operator to set the the resources wins and *all* worker pods in the DAG get the settings applied, not just the single operator.

 
from datetime import datetime, timedelta

from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(hours=1)
}

dag = DAG(
    'two_tasked_dag',
    start_date=datetime(2018, 8, 21),
    default_args=default_args,
    catchup=False
)

a = BashOperator(
    task_id="A",
    bash_command="""
echo hello
""",
    dag=dag,
    executor_config={
        "KubernetesExecutor": {
            "request_memory": "700Mi",
            "request_cpu": "300m",
            "limit_memory": "900Mi",
            "limit_cpu": "500m",
        }
    }
)

b = BashOperator(
    task_id="B",
    bash_command="""
echo hello
""",
    dag=dag,
    executor_config={
        "KubernetesExecutor": {
            "request_memory": "800Mi",
            "request_cpu": "200m",
            "limit_memory": "1Gi",
            "limit_cpu": "600m",
        }
    }

)
 

 
{code:java}
$ kubectl get pods -n airflow
NAME READY STATUS RESTARTS AGE
airflow-postgresql-5bf5b6ddf5-f86sz 1/1 Running 0 22h
airflow-scheduler-6496dfd5db-cbhkc 2/2 Running 2 52m
airflow-web-7d76c9c4c-f7cpz 3/3 Running 3 52m
twotaskeddaga-350ad7a98c4b4908a8bfb02478e346a6 1/1 Running 0 1m
twotaskeddagb-faa8831f58b94b2fa771b62cb6bb952e 1/1 Running 0 1m

$ kubectl describe pod -n airflow twotaskeddaga-350ad7a98c4b4908a8bfb02478e346a6
...
    Limits:
      cpu:     600m
      memory:  1Gi
    Requests:
      cpu:     200m
      memory:  800Mi
...

$ kubectl describe pod -n airflow twotaskeddagb-faa8831f58b94b2fa771b62cb6bb952e
...
    Limits:
      cpu:     600m
      memory:  1Gi
    Requests:
      cpu:     200m
      memory:  800Mi
...

{code}
 

The above shows that the last task in the DAG sets the resource limits for the first task in the DAG, even though they have separate executor_config's. This also happens if task A were to not have any executor_config set at all.

 

It is also worth mentioning (although maybe not relevant) that I am setting default pod resources using an airflow_local_settings.py file:

 
{code:java}
from airflow.utils.log.logging_mixin import LoggingMixin
    from deepmerge import Merger


    DEFAULT_EXECUTOR_CONFIG = {
        "KubernetesExecutor": {
            "request_memory": "250Mi",
            "request_cpu": "250m",
            "limit_memory": "500Mi",
            "limit_cpu": "500m",
        }
    }

    merger = Merger(
        type_strategies=[(list, ["append"]), (dict, ["merge"])],
        fallback_strategies=["override"],
        type_conflict_strategies=["override"],
    )

    log = LoggingMixin().log


    def policy(ti):
        """Add the default executor config to all task instances."""

        executor_config = getattr(ti, "executor_config", {})
        if executor_config:
            log.debug(f"Merging {ti.task_id} executor config with default executor config")
            ti.executor_config = merger.merge(DEFAULT_EXECUTOR_CONFIG, executor_config)
            log.debug(ti.executor_config)
        else:
            log.debug(f"Applying default executor config to {ti.task_id}")
            ti.executor_config = DEFAULT_EXECUTOR_CONFIG
            log.debug(ti.executor_config)
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)