You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Jun Xie (Jira)" <ji...@apache.org> on 2020/01/20 11:33:00 UTC
[jira] [Work started] (AIRFLOW-6602) Make "executor_config"
templated field to support dynamic parameters for kubernetes executor
[ https://issues.apache.org/jira/browse/AIRFLOW-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on AIRFLOW-6602 started by Jun Xie.
----------------------------------------
> Make "executor_config" templated field to support dynamic parameters for kubernetes executor
> --------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-6602
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6602
> Project: Apache Airflow
> Issue Type: New Feature
> Components: executor-kubernetes, executors
> Affects Versions: 1.10.7
> Reporter: Jun Xie
> Assignee: Jun Xie
> Priority: Major
>
> When running airflow with Kubernetes Executor, one specifies the configurations through
> "executor_config". At the moment, this field is not templated, meaning that we won't be able to have dynamic parameters. So I did an experiment that I created MyPythonOperator which inherits PythonOperator but with with "executor_config" added to template_fields. However, the result shows that this change itself isn't enough, because airflow first creates a Pod based on executor_config without rendering it, and then run the task inside the pod (the running will trigger the Jinja template rendering)
> See an example config below showing a use case where one can mount dynamic "subPath" to the image
>
> {code:java}
> executor_config = {
> "KubernetesExecutor": {
> "image": "some_image",
> "request_memory": "2Gi",
> 'request_cpu': '1',
> "volumes": [
> {
> "name": "data",
> "persistentVolumeClaim": {"claimName": "some_claim_name"},
> },
> ],
> "volume_mounts": [
> {
> "mountPath": "/code",
> "name": "data",
> "subPath": "/code/{{ dag_run.conf['branch_name'] }}"
> },
> ]
> }
> }
> {code}
>
>
>
> I have then did a further experiment that in
> trigger_tasks() from airflow/executors/base_executor.py, right before execute_async() is called, I called simple_ti.render_templates() which will trigger the rendering, so the kubernetes_executor.execute_async() will pick up the resolved parameters
>
> {code:java}
> # current behavior
> for i in range(min((open_slots, len(self.queued_tasks)))):
> key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
> self.queued_tasks.pop(key)
> self.running[key] = command
> self.execute_async(key=key,
> command=command,
> queue=queue,
> executor_config=simple_ti.executor_config)
> {code}
>
>
> {code:java}
> # Proposed new behavior:
> for i in range(min((open_slots, len(self.queued_tasks)))):
> key, (command, _, queue, simple_ti) = sorted_queue.pop(0)
> self.queued_tasks.pop(key)
> self.running[key] = command
> simple_ti.render_templates() # render it
> self.execute_async(key=key,
> command=command,
> queue=queue,
> executor_config=simple_ti.executor_config)
> {code}
>
>
> I think this is a very useful feature to include into Airflow, especially for implementing CI/CD pipelines where we can mount dynamic volume and/or subPath to the image and this will open up a lot of other use-cases
--
This message was sent by Atlassian Jira
(v8.3.4#803005)