You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/19 16:01:53 UTC
[GitHub] [airflow] fzkhouy opened a new issue, #27147: SparkKubernetesOperator: application_file does not work as a ".yaml" file
fzkhouy opened a new issue, #27147:
URL: https://github.com/apache/airflow/issues/27147
### Apache Airflow version
2.4.1
### What happened
**When defining Kubernetes 'custom_resource_definition' of 'sparkApplication' as a path to a '.yaml' file:**
the dag fails with this Error:
```
[2022-10-19, 09:19:21 +01] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 76, in execute
namespace=self.namespace,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 324, in create_custom_object
name=body_dict["metadata"]["name"],
TypeError: string indices must be integers
```
### What you think should happen instead
The normal behavior and referred to airflow documentation is to read the application_file ".yaml" and get the `name=body["metadata"]["name"]` in my case:
```
metadata:
name: spark
```
So, the dag can send a request to the Kubernetes cluster to execute the spark application
### How to reproduce
- Set up the connection with kubernetes cluster
- Create a dag using SparkKubernetesOperator
- Provide your 'sparkApplication' as a path to a '.yaml' file
- Execute the dag and Hopefully, the same problem should occur
### Operating System
ubuntu 20.04
### Versions of Apache Airflow Providers
`apache-airflow-providers-cncf-kubernetes==4.4.0`
### Deployment
Docker-Compose
### Deployment details
- Docker-compose deployment using customized airflow image (just adding the requirements already mentioned)
- The docker container is in a Linux server + we try to connect with a k8s cluster ( so we can execute our spark applications
### Anything else
_No response_
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #27147: SparkKubernetesOperator: application_file does not work as a ".yaml" file
Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1284244063
Thanks for opening your first issue here! Be sure to follow the issue template!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1291256560
Yep. This is as expected. @fzkhouy You should add this:
```
template_searchpath='/opt/airflow/kubernetes/templates/'
```
to your Dag object and use `airflow-worker-template.yaml` as your template name.
Look at our docs: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/fundamentals.html#templating-with-jinja - it is clearly described there.
we are using "file loader" in JINJA and it expects separate "template directory" and template name.
Closing as invalid. Please explain if I misunderstood, we can still re-open it, but from the examples you posted it is just wrong usage. If you have other examples that correctly use templates, you can post them here and example exactly what you want to do and the problem you see (following the examples) . Maybe this is a different problem that we misunderstood.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tirkarthi commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file
Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285733237
I receive below error on trying to reproduce this with the posted dag file. The config file should be relative to the dag or config file should be present as under the value template_searchpath. The application_file should be replaced with file content in below code part using `resolve_template_files` function where all extensions .yaml, .json and .yml are checked. If I add a trailing space after ".yaml " extension then I can reproduce the error as the extension doesn't match and just sets application_file as "/opt/airflow/kubernetes/templates/airflow-worker-template.yaml" instead of the contents of the yaml file.
* Can you please add if I am missing some configuration?
* Can you please try with airflow-worker-template.yaml relative to the dag file and have `application_file="airflow-worker-template.yaml"` without any spaces in the end? You can also try breakpoint in the below file to see what `resolve_template_files` does.
https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/models/abstractoperator.py#L164-L185
```
airflow dags test spark_pi
[2022-10-20 15:06:45,774] {dagbag.py:537} INFO - Filling up the DagBag from /files/dags
[2022-10-20 15:06:45,792] {abstractoperator.py:176} ERROR - Failed to resolve template field '\x1b[01mapplication_file\x1b[22m'
Traceback (most recent call last):
File "/opt/airflow/airflow/models/abstractoperator.py", line 174, in resolve_template_files
setattr(self, field, env.loader.get_source(env, content)[0]) # type: ignore
File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 218, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /opt/airflow/kubernetes/templates/airflow-worker-template.yaml
[2022-10-20 15:06:45,873] {dag.py:3654} INFO - dagrun id: spark_pi
/opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
[2022-10-20 15:06:45,903] {dag.py:3671} INFO - created dagrun <DagRun spark_pi @ 2022-10-20T15:06:45.774111+00:00: manual__2022-10-20T15:06:45.774111+00:00, state:running, queued_at: None. externally triggered: False>
[2022-10-20 15:06:45,925] {dag.py:3621} INFO - *****************************************************
[2022-10-20 15:06:45,925] {dag.py:3625} INFO - Running task airflow_health_check
[2022-10-20 15:06:46,334] {taskinstance.py:1587} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=spark_pi
AIRFLOW_CTX_TASK_ID=airflow_health_check
AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T15:06:45.774111+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T15:06:45.774111+00:00
[2022-10-20 15:06:46,334] {taskinstance.py:1587} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=spark_pi
AIRFLOW_CTX_TASK_ID=airflow_health_check
AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T15:06:45.774111+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T15:06:45.774111+00:00
[2022-10-20 15:06:46,341] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=spark_pi, task_id=airflow_health_check, execution_date=20221020T150645, start_date=, end_date=20221020T150646
[2022-10-20 15:06:46,341] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=spark_pi, task_id=airflow_health_check, execution_date=20221020T150645, start_date=, end_date=20221020T150646
[2022-10-20 15:06:46,360] {dag.py:3629} INFO - airflow_health_check ran successfully!
[2022-10-20 15:06:46,360] {dag.py:3632} INFO - *****************************************************
[2022-10-20 15:06:46,369] {dag.py:3621} INFO - *****************************************************
[2022-10-20 15:06:46,370] {dag.py:3625} INFO - Running task main_app
[2022-10-20 15:06:46,390] {abstractoperator.py:422} ERROR - Exception rendering Jinja template for task 'main_app', field 'application_file'. Template: '/opt/airflow/kubernetes/templates/airflow-worker-template.yaml'
Traceback (most recent call last):
File "/opt/airflow/airflow/models/abstractoperator.py", line 415, in _do_render_template_fields
seen_oids,
File "/opt/airflow/airflow/models/abstractoperator.py", line 466, in render_template
template = jinja_env.get_template(value)
File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 1010, in get_template
return self._load_template(name, globals)
File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 969, in _load_template
template = self.loader.load(self, name, self.make_globals(globals))
File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 126, in load
source, filename, uptodate = self.get_source(environment, name)
File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 218, in get_source
raise TemplateNotFound(template)
jinja2.exceptions.TemplateNotFound: /opt/airflow/kubernetes/templates/airflow-worker-template.yaml
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tirkarthi commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file
Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285131240
Can you please add a sample dag file to reproduce this? I tried below code with "config.yaml" relative to the dag file and with a print statement in airflow code before the yaml parsing at https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L281-L284. I was able to see the yaml content.
```python
import datetime
from airflow.decorators import dag
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
@dag(start_date=datetime.datetime(2021, 1, 1))
def mydag():
op = SparkKubernetesOperator(
application_file="config.yaml",
kubernetes_conn_id='kubernetes_with_namespace',
task_id='test_task_id',
)
mydag()
```
```
airflow dags test mydag
[2022-10-20 08:18:58,772] {dagbag.py:537} INFO - Filling up the DagBag from /files/dags
[2022-10-20 08:18:58,864] {dag.py:3654} INFO - dagrun id: mydag
/opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
[2022-10-20 08:18:58,893] {dag.py:3671} INFO - created dagrun <DagRun mydag @ 2022-10-20T08:18:58.772022+00:00: manual__2022-10-20T08:18:58.772022+00:00, state:running, queued_at: None. externally triggered: False>
[2022-10-20 08:18:58,905] {dag.py:3621} INFO - *****************************************************
[2022-10-20 08:18:58,905] {dag.py:3625} INFO - Running task test_task_id
[2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=mydag
AIRFLOW_CTX_TASK_ID=test_task_id
AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
[2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=mydag
AIRFLOW_CTX_TASK_ID=test_task_id
AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
[2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
[2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.5"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
sparkVersion: "2.4.5"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.5
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.5
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] fzkhouy commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file
Posted by GitBox <gi...@apache.org>.
fzkhouy commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285531033
> Can you please add a sample dag file to reproduce this? I tried below code with "config.yaml" relative to the dag file and with a print statement in airflow code before the yaml parsing at
>
> https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L281-L284
> . I was able to see the yaml content.
>
> Edit : Please also ensure there is no trailing space in the filename after yaml extension which might also cause this.
>
> ```python
> import datetime
>
> from airflow.decorators import dag
> from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
>
> @dag(start_date=datetime.datetime(2021, 1, 1))
> def mydag():
> op = SparkKubernetesOperator(
> application_file="config.yaml",
> kubernetes_conn_id='kubernetes_with_namespace',
> task_id='test_task_id',
> )
>
> mydag()
> ```
>
> ```
> airflow dags test mydag
> [2022-10-20 08:18:58,772] {dagbag.py:537} INFO - Filling up the DagBag from /files/dags
> [2022-10-20 08:18:58,864] {dag.py:3654} INFO - dagrun id: mydag
> /opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
> [2022-10-20 08:18:58,893] {dag.py:3671} INFO - created dagrun <DagRun mydag @ 2022-10-20T08:18:58.772022+00:00: manual__2022-10-20T08:18:58.772022+00:00, state:running, queued_at: None. externally triggered: False>
> [2022-10-20 08:18:58,905] {dag.py:3621} INFO - *****************************************************
> [2022-10-20 08:18:58,905] {dag.py:3625} INFO - Running task test_task_id
> [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
> AIRFLOW_CTX_DAG_OWNER=airflow
> AIRFLOW_CTX_DAG_ID=mydag
> AIRFLOW_CTX_TASK_ID=test_task_id
> AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
> AIRFLOW_CTX_TRY_NUMBER=1
> AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
> [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
> AIRFLOW_CTX_DAG_OWNER=airflow
> AIRFLOW_CTX_DAG_ID=mydag
> AIRFLOW_CTX_TASK_ID=test_task_id
> AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
> AIRFLOW_CTX_TRY_NUMBER=1
> AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
> [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
> [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
> apiVersion: "sparkoperator.k8s.io/v1beta2"
> kind: SparkApplication
> metadata:
> name: spark-pi
> namespace: default
> spec:
> type: Scala
> mode: cluster
> image: "gcr.io/spark-operator/spark:v2.4.5"
> imagePullPolicy: Always
> mainClass: org.apache.spark.examples.SparkPi
> mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
> sparkVersion: "2.4.5"
> restartPolicy:
> type: Never
> volumes:
> - name: "test-volume"
> hostPath:
> path: "/tmp"
> type: Directory
> driver:
> cores: 1
> coreLimit: "1200m"
> memory: "512m"
> labels:
> version: 2.4.5
> serviceAccount: spark
> volumeMounts:
> - name: "test-volume"
> mountPath: "/tmp"
> executor:
> cores: 1
> instances: 1
> memory: "512m"
> labels:
> version: 2.4.5
> volumeMounts:
> - name: "test-volume"
> mountPath: "/tmp"
> ```
**Thnak you for your reply. Well this is the dag file that reproduce the problem:**
```
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="spark_pi",
default_args={'max_active_runs': 1},
description='submit spark-pi as sparkApplication on kubernetes',
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
t0 = EmptyOperator(task_id='airflow_health_check', retries=1, dag=dag)
t1 = SparkKubernetesOperator(
task_id='main_app',
kubernetes_conn_id='kubernetes_cluster',
namespace="spark",
application_file="/opt/airflow/kubernetes/templates/airflow-worker-template.yaml",
do_xcom_push=True,
dag=dag,
)
t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
kubernetes_conn_id='kubernetes_cluster',
namespace="spark",
application_name="{{ task_instance.xcom_pull(task_ids='main_app')['metadata']['name'] }}",
dag=dag,
)
t0 >> t1 >> t2
```
**When debugging on my own, i found that the problem is in this function**
https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L33-L38
because when debugging it :
- _generating the same problem by this code (the actual code in airflow)_
```
from airflow.utils import yaml
body = yaml.safe_load("airflow-worker-template.yaml")
```
The body in this case is the path, when printing the body it gives `"airflow-worker-template.yaml"`
So, we get the problem which is `TypeError: string indices must be integers` when we want to access the "metadata"...
- _resolving the problem_
With the same code, i just give the yaml object instead of the path to the function. Like this:
```
from airflow.utils import yaml
with open("airflow-worker-template.yaml ") as yaml_file:
body = yaml.safe_load(yaml_file)
```
in that case the body is giving the yaml content, so we can access the "metadata"
Therefore, what i see is the function needs to verify whether the giving value ends with ".yaml".. which means a path for a file then we can open it, otherwise, work with the actual process
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk closed issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file
Posted by GitBox <gi...@apache.org>.
potiuk closed issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file
URL: https://github.com/apache/airflow/issues/27147
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org