You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/19 16:01:53 UTC

[GitHub] [airflow] fzkhouy opened a new issue, #27147: SparkKubernetesOperator: application_file does not work as a ".yaml" file

fzkhouy opened a new issue, #27147:
URL: https://github.com/apache/airflow/issues/27147

   ### Apache Airflow version
   
   2.4.1
   
   ### What happened
   
   **When defining Kubernetes 'custom_resource_definition' of 'sparkApplication' as  a path to a '.yaml' file:**
   the dag fails with this Error:
   ```
   [2022-10-19, 09:19:21 +01] {taskinstance.py:1851} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 76, in execute
       namespace=self.namespace,
     File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 324, in create_custom_object
       name=body_dict["metadata"]["name"],
   TypeError: string indices must be integers
   ```
   
   
   
   ### What you think should happen instead
   
   The normal behavior and referred to airflow documentation is to read the application_file ".yaml" and get the  `name=body["metadata"]["name"]`  in my case:
   ```
   metadata:
     name: spark
   ```
   So, the dag can send a request to the Kubernetes cluster to execute the spark application
   
   ### How to reproduce
   
   - Set up the connection with kubernetes cluster
   - Create a dag using SparkKubernetesOperator
   - Provide your  'sparkApplication' as  a path to a '.yaml' file
   - Execute the dag and Hopefully, the same problem should occur
   
   ### Operating System
   
   ubuntu 20.04
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-cncf-kubernetes==4.4.0`
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   - Docker-compose deployment using customized airflow image (just adding the requirements already mentioned)
   - The docker container is in a Linux server + we try to connect with a k8s cluster ( so we can execute our spark applications 
    
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #27147: SparkKubernetesOperator: application_file does not work as a ".yaml" file

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1284244063

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1291256560

   Yep. This is as expected. @fzkhouy  You should add this:
   
   ```
   template_searchpath='/opt/airflow/kubernetes/templates/' 
   ```
   
   to your Dag object and use `airflow-worker-template.yaml` as your template name.
   
   Look at our docs: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/fundamentals.html#templating-with-jinja - it is clearly described there.
   
   we are using "file loader" in JINJA and it expects separate "template directory" and template name.
   
   Closing as invalid. Please explain if I misunderstood, we can still re-open it, but from the examples you posted it is just wrong usage. If you have other examples that correctly use templates, you can post them here and example exactly what you want to do and the problem you see (following the examples) . Maybe this is a different problem that we misunderstood. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285733237

   I receive below error on trying to reproduce this with the posted dag file. The config file should be relative to the dag or config file should be present as under the value template_searchpath. The application_file should be replaced with file content in below code part using `resolve_template_files` function where all extensions .yaml, .json and .yml are checked. If I add a trailing space after ".yaml " extension then I can reproduce the error as the extension doesn't match and just sets application_file as "/opt/airflow/kubernetes/templates/airflow-worker-template.yaml" instead of the contents of the yaml file. 
   * Can you please add if I am missing some configuration? 
   * Can you please try with airflow-worker-template.yaml relative to the dag file and have `application_file="airflow-worker-template.yaml"` without any spaces in the end? You can also try breakpoint in the below file to see what `resolve_template_files` does.
   
   https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/models/abstractoperator.py#L164-L185
   
   ```
   airflow dags test spark_pi
   [2022-10-20 15:06:45,774] {dagbag.py:537} INFO - Filling up the DagBag from /files/dags
   [2022-10-20 15:06:45,792] {abstractoperator.py:176} ERROR - Failed to resolve template field '\x1b[01mapplication_file\x1b[22m'
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/abstractoperator.py", line 174, in resolve_template_files
       setattr(self, field, env.loader.get_source(env, content)[0])  # type: ignore
     File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 218, in get_source
       raise TemplateNotFound(template)
   jinja2.exceptions.TemplateNotFound: /opt/airflow/kubernetes/templates/airflow-worker-template.yaml
   [2022-10-20 15:06:45,873] {dag.py:3654} INFO - dagrun id: spark_pi
   /opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
   [2022-10-20 15:06:45,903] {dag.py:3671} INFO - created dagrun <DagRun spark_pi @ 2022-10-20T15:06:45.774111+00:00: manual__2022-10-20T15:06:45.774111+00:00, state:running, queued_at: None. externally triggered: False>
   [2022-10-20 15:06:45,925] {dag.py:3621} INFO - *****************************************************
   [2022-10-20 15:06:45,925] {dag.py:3625} INFO - Running task airflow_health_check
   [2022-10-20 15:06:46,334] {taskinstance.py:1587} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=spark_pi
   AIRFLOW_CTX_TASK_ID=airflow_health_check
   AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T15:06:45.774111+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T15:06:45.774111+00:00
   [2022-10-20 15:06:46,334] {taskinstance.py:1587} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=spark_pi
   AIRFLOW_CTX_TASK_ID=airflow_health_check
   AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T15:06:45.774111+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T15:06:45.774111+00:00
   [2022-10-20 15:06:46,341] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=spark_pi, task_id=airflow_health_check, execution_date=20221020T150645, start_date=, end_date=20221020T150646
   [2022-10-20 15:06:46,341] {taskinstance.py:1401} INFO - Marking task as SUCCESS. dag_id=spark_pi, task_id=airflow_health_check, execution_date=20221020T150645, start_date=, end_date=20221020T150646
   [2022-10-20 15:06:46,360] {dag.py:3629} INFO - airflow_health_check ran successfully!
   [2022-10-20 15:06:46,360] {dag.py:3632} INFO - *****************************************************
   [2022-10-20 15:06:46,369] {dag.py:3621} INFO - *****************************************************
   [2022-10-20 15:06:46,370] {dag.py:3625} INFO - Running task main_app
   [2022-10-20 15:06:46,390] {abstractoperator.py:422} ERROR - Exception rendering Jinja template for task 'main_app', field 'application_file'. Template: '/opt/airflow/kubernetes/templates/airflow-worker-template.yaml'
   Traceback (most recent call last):
     File "/opt/airflow/airflow/models/abstractoperator.py", line 415, in _do_render_template_fields
       seen_oids,
     File "/opt/airflow/airflow/models/abstractoperator.py", line 466, in render_template
       template = jinja_env.get_template(value)
     File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 1010, in get_template
       return self._load_template(name, globals)
     File "/usr/local/lib/python3.7/site-packages/jinja2/environment.py", line 969, in _load_template
       template = self.loader.load(self, name, self.make_globals(globals))
     File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 126, in load
       source, filename, uptodate = self.get_source(environment, name)
     File "/usr/local/lib/python3.7/site-packages/jinja2/loaders.py", line 218, in get_source
       raise TemplateNotFound(template)
   jinja2.exceptions.TemplateNotFound: /opt/airflow/kubernetes/templates/airflow-worker-template.yaml
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tirkarthi commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file

Posted by GitBox <gi...@apache.org>.
tirkarthi commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285131240

   Can you please add a sample dag file to reproduce this? I tried below code with "config.yaml" relative to the dag file and with a print statement in airflow code before the yaml parsing at https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L281-L284. I was able to see the yaml content.
   
   ```python
   import datetime
   
   from airflow.decorators import dag
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
   
   @dag(start_date=datetime.datetime(2021, 1, 1))
   def mydag():
       op = SparkKubernetesOperator(
               application_file="config.yaml",
               kubernetes_conn_id='kubernetes_with_namespace',
               task_id='test_task_id',
           )
   
   mydag()
   ```
   
   ```
   airflow dags test mydag
   [2022-10-20 08:18:58,772] {dagbag.py:537} INFO - Filling up the DagBag from /files/dags
   [2022-10-20 08:18:58,864] {dag.py:3654} INFO - dagrun id: mydag
   /opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
   [2022-10-20 08:18:58,893] {dag.py:3671} INFO - created dagrun <DagRun mydag @ 2022-10-20T08:18:58.772022+00:00: manual__2022-10-20T08:18:58.772022+00:00, state:running, queued_at: None. externally triggered: False>
   [2022-10-20 08:18:58,905] {dag.py:3621} INFO - *****************************************************
   [2022-10-20 08:18:58,905] {dag.py:3625} INFO - Running task test_task_id
   [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=mydag
   AIRFLOW_CTX_TASK_ID=test_task_id
   AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
   [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
   AIRFLOW_CTX_DAG_OWNER=airflow
   AIRFLOW_CTX_DAG_ID=mydag
   AIRFLOW_CTX_TASK_ID=test_task_id
   AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
   [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
   [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
   apiVersion: "sparkoperator.k8s.io/v1beta2"
   kind: SparkApplication
   metadata:
     name: spark-pi
     namespace: default
   spec:
     type: Scala
     mode: cluster
     image: "gcr.io/spark-operator/spark:v2.4.5"
     imagePullPolicy: Always
     mainClass: org.apache.spark.examples.SparkPi
     mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
     sparkVersion: "2.4.5"
     restartPolicy:
       type: Never
     volumes:
       - name: "test-volume"
         hostPath:
           path: "/tmp"
           type: Directory
     driver:
       cores: 1
       coreLimit: "1200m"
       memory: "512m"
       labels:
         version: 2.4.5
       serviceAccount: spark
       volumeMounts:
         - name: "test-volume"
           mountPath: "/tmp"
     executor:
       cores: 1
       instances: 1
       memory: "512m"
       labels:
         version: 2.4.5
       volumeMounts:
         - name: "test-volume"
           mountPath: "/tmp"
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] fzkhouy commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file

Posted by GitBox <gi...@apache.org>.
fzkhouy commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285531033

   > Can you please add a sample dag file to reproduce this? I tried below code with "config.yaml" relative to the dag file and with a print statement in airflow code before the yaml parsing at
   > 
   > https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L281-L284
   > . I was able to see the yaml content.
   > 
   > Edit : Please also ensure there is no trailing space in the filename after yaml extension which might also cause this.
   > 
   > ```python
   > import datetime
   > 
   > from airflow.decorators import dag
   > from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
   > 
   > @dag(start_date=datetime.datetime(2021, 1, 1))
   > def mydag():
   >     op = SparkKubernetesOperator(
   >             application_file="config.yaml",
   >             kubernetes_conn_id='kubernetes_with_namespace',
   >             task_id='test_task_id',
   >         )
   > 
   > mydag()
   > ```
   > 
   > ```
   > airflow dags test mydag
   > [2022-10-20 08:18:58,772] {dagbag.py:537} INFO - Filling up the DagBag from /files/dags
   > [2022-10-20 08:18:58,864] {dag.py:3654} INFO - dagrun id: mydag
   > /opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
   > [2022-10-20 08:18:58,893] {dag.py:3671} INFO - created dagrun <DagRun mydag @ 2022-10-20T08:18:58.772022+00:00: manual__2022-10-20T08:18:58.772022+00:00, state:running, queued_at: None. externally triggered: False>
   > [2022-10-20 08:18:58,905] {dag.py:3621} INFO - *****************************************************
   > [2022-10-20 08:18:58,905] {dag.py:3625} INFO - Running task test_task_id
   > [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
   > AIRFLOW_CTX_DAG_OWNER=airflow
   > AIRFLOW_CTX_DAG_ID=mydag
   > AIRFLOW_CTX_TASK_ID=test_task_id
   > AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
   > AIRFLOW_CTX_TRY_NUMBER=1
   > AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
   > [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
   > AIRFLOW_CTX_DAG_OWNER=airflow
   > AIRFLOW_CTX_DAG_ID=mydag
   > AIRFLOW_CTX_TASK_ID=test_task_id
   > AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
   > AIRFLOW_CTX_TRY_NUMBER=1
   > AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
   > [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
   > [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
   > apiVersion: "sparkoperator.k8s.io/v1beta2"
   > kind: SparkApplication
   > metadata:
   >   name: spark-pi
   >   namespace: default
   > spec:
   >   type: Scala
   >   mode: cluster
   >   image: "gcr.io/spark-operator/spark:v2.4.5"
   >   imagePullPolicy: Always
   >   mainClass: org.apache.spark.examples.SparkPi
   >   mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
   >   sparkVersion: "2.4.5"
   >   restartPolicy:
   >     type: Never
   >   volumes:
   >     - name: "test-volume"
   >       hostPath:
   >         path: "/tmp"
   >         type: Directory
   >   driver:
   >     cores: 1
   >     coreLimit: "1200m"
   >     memory: "512m"
   >     labels:
   >       version: 2.4.5
   >     serviceAccount: spark
   >     volumeMounts:
   >       - name: "test-volume"
   >         mountPath: "/tmp"
   >   executor:
   >     cores: 1
   >     instances: 1
   >     memory: "512m"
   >     labels:
   >       version: 2.4.5
   >     volumeMounts:
   >       - name: "test-volume"
   >         mountPath: "/tmp"
   > ```
   
   **Thnak you for your reply. Well this is the dag file that reproduce the problem:**
   ```
   from __future__ import annotations
   from datetime import datetime, timedelta
   from airflow import DAG
   # Operators; we need this to operate!
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
   from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
   from airflow.operators.empty import EmptyOperator
   
   
   with DAG(
       dag_id="spark_pi",
       default_args={'max_active_runs': 1},
       description='submit spark-pi as sparkApplication on kubernetes',
       schedule=timedelta(days=1),
       start_date=datetime(2021, 1, 1),
       catchup=False,
   ) as dag:
       t0 = EmptyOperator(task_id='airflow_health_check', retries=1, dag=dag)
       t1 = SparkKubernetesOperator(
           task_id='main_app',
           kubernetes_conn_id='kubernetes_cluster',
           namespace="spark",
           application_file="/opt/airflow/kubernetes/templates/airflow-worker-template.yaml",
           do_xcom_push=True,
           dag=dag,
   
       )
       t2 = SparkKubernetesSensor(
           task_id='spark_pi_monitor',
           kubernetes_conn_id='kubernetes_cluster',
           namespace="spark",
           application_name="{{ task_instance.xcom_pull(task_ids='main_app')['metadata']['name'] }}",
           dag=dag,
       )
       t0 >> t1 >> t2
   ```
    **When debugging on my own, i found that the problem is in this function**
   https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L33-L38
   
   because when debugging it :
   
   - _generating the same problem by this code (the actual code in airflow)_
   ```
   from airflow.utils import yaml
   body = yaml.safe_load("airflow-worker-template.yaml")
   ```
   The body in this case is the path,  when printing the body it gives `"airflow-worker-template.yaml"`
   So, we get the problem which is `TypeError: string indices must be integers` when we want to access the "metadata"...
   - _resolving the problem_
   With the same code, i just give the yaml object instead of the path to the function. Like this:
   ```
   from airflow.utils import yaml
   with open("airflow-worker-template.yaml ") as yaml_file:
       body = yaml.safe_load(yaml_file)
   ```
   in that case the body is giving the yaml content, so we can access the "metadata"
   Therefore, what i see is the function needs to verify whether  the giving value ends with ".yaml".. which means a path for a file then we can open it, otherwise,  work with the actual process


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file

Posted by GitBox <gi...@apache.org>.
potiuk closed issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a  ".yaml" file
URL: https://github.com/apache/airflow/issues/27147


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org