You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/20 13:21:54 UTC

[GitHub] [airflow] fzkhouy commented on issue #27147: SparkKubernetesOperator: Dag fails when application_file sent as a ".yaml" file

fzkhouy commented on issue #27147:
URL: https://github.com/apache/airflow/issues/27147#issuecomment-1285531033

   > Can you please add a sample dag file to reproduce this? I tried below code with "config.yaml" relative to the dag file and with a print statement in airflow code before the yaml parsing at
   > 
   > https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L281-L284
   > . I was able to see the yaml content.
   > 
   > Edit : Please also ensure there is no trailing space in the filename after yaml extension which might also cause this.
   > 
   > ```python
   > import datetime
   > 
   > from airflow.decorators import dag
   > from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
   > 
   > @dag(start_date=datetime.datetime(2021, 1, 1))
   > def mydag():
   >     op = SparkKubernetesOperator(
   >             application_file="config.yaml",
   >             kubernetes_conn_id='kubernetes_with_namespace',
   >             task_id='test_task_id',
   >         )
   > 
   > mydag()
   > ```
   > 
   > ```
   > airflow dags test mydag
   > [2022-10-20 08:18:58,772] {dagbag.py:537} INFO - Filling up the DagBag from /files/dags
   > [2022-10-20 08:18:58,864] {dag.py:3654} INFO - dagrun id: mydag
   > /opt/airflow/airflow/models/dag.py:3669 RemovedInAirflow3Warning: Calling `DAG.create_dagrun()` without an explicit data interval is deprecated
   > [2022-10-20 08:18:58,893] {dag.py:3671} INFO - created dagrun <DagRun mydag @ 2022-10-20T08:18:58.772022+00:00: manual__2022-10-20T08:18:58.772022+00:00, state:running, queued_at: None. externally triggered: False>
   > [2022-10-20 08:18:58,905] {dag.py:3621} INFO - *****************************************************
   > [2022-10-20 08:18:58,905] {dag.py:3625} INFO - Running task test_task_id
   > [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
   > AIRFLOW_CTX_DAG_OWNER=airflow
   > AIRFLOW_CTX_DAG_ID=mydag
   > AIRFLOW_CTX_TASK_ID=test_task_id
   > AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
   > AIRFLOW_CTX_TRY_NUMBER=1
   > AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
   > [2022-10-20 08:18:59,323] {taskinstance.py:1587} INFO - Exporting the following env vars:
   > AIRFLOW_CTX_DAG_OWNER=airflow
   > AIRFLOW_CTX_DAG_ID=mydag
   > AIRFLOW_CTX_TASK_ID=test_task_id
   > AIRFLOW_CTX_EXECUTION_DATE=2022-10-20T08:18:58.772022+00:00
   > AIRFLOW_CTX_TRY_NUMBER=1
   > AIRFLOW_CTX_DAG_RUN_ID=manual__2022-10-20T08:18:58.772022+00:00
   > [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
   > [2022-10-20 08:18:59,323] {spark_kubernetes.py:70} INFO - Creating sparkApplication
   > apiVersion: "sparkoperator.k8s.io/v1beta2"
   > kind: SparkApplication
   > metadata:
   >   name: spark-pi
   >   namespace: default
   > spec:
   >   type: Scala
   >   mode: cluster
   >   image: "gcr.io/spark-operator/spark:v2.4.5"
   >   imagePullPolicy: Always
   >   mainClass: org.apache.spark.examples.SparkPi
   >   mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
   >   sparkVersion: "2.4.5"
   >   restartPolicy:
   >     type: Never
   >   volumes:
   >     - name: "test-volume"
   >       hostPath:
   >         path: "/tmp"
   >         type: Directory
   >   driver:
   >     cores: 1
   >     coreLimit: "1200m"
   >     memory: "512m"
   >     labels:
   >       version: 2.4.5
   >     serviceAccount: spark
   >     volumeMounts:
   >       - name: "test-volume"
   >         mountPath: "/tmp"
   >   executor:
   >     cores: 1
   >     instances: 1
   >     memory: "512m"
   >     labels:
   >       version: 2.4.5
   >     volumeMounts:
   >       - name: "test-volume"
   >         mountPath: "/tmp"
   > ```
   
   **Thnak you for your reply. Well this is the dag file that reproduce the problem:**
   ```
   from __future__ import annotations
   from datetime import datetime, timedelta
   from airflow import DAG
   # Operators; we need this to operate!
   from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
   from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
   from airflow.operators.empty import EmptyOperator
   
   
   with DAG(
       dag_id="spark_pi",
       default_args={'max_active_runs': 1},
       description='submit spark-pi as sparkApplication on kubernetes',
       schedule=timedelta(days=1),
       start_date=datetime(2021, 1, 1),
       catchup=False,
   ) as dag:
       t0 = EmptyOperator(task_id='airflow_health_check', retries=1, dag=dag)
       t1 = SparkKubernetesOperator(
           task_id='main_app',
           kubernetes_conn_id='kubernetes_cluster',
           namespace="spark",
           application_file="/opt/airflow/kubernetes/templates/airflow-worker-template.yaml",
           do_xcom_push=True,
           dag=dag,
   
       )
       t2 = SparkKubernetesSensor(
           task_id='spark_pi_monitor',
           kubernetes_conn_id='kubernetes_cluster',
           namespace="spark",
           application_name="{{ task_instance.xcom_pull(task_ids='main_app')['metadata']['name'] }}",
           dag=dag,
       )
       t0 >> t1 >> t2
   ```
    **When debugging on my own, i found that the problem is in this function**
   https://github.com/apache/airflow/blob/b9e133e40c2848b0d555051a99bf8d2816fd28a7/airflow/providers/cncf/kubernetes/hooks/kubernetes.py#L33-L38
   
   because when debugging it :
   
   - _generating the same problem by this code (the actual code in airflow)_
   ```
   from airflow.utils import yaml
   body = yaml.safe_load("airflow-worker-template.yaml")
   ```
   The body in this case is the path,  when printing the body it gives `"airflow-worker-template.yaml"`
   So, we get the problem which is `TypeError: string indices must be integers` when we want to access the "metadata"...
   - _resolving the problem_
   With the same code, i just give the yaml object instead of the path to the function. Like this:
   ```
   from airflow.utils import yaml
   with open("airflow-worker-template.yaml ") as yaml_file:
       body = yaml.safe_load(yaml_file)
   ```
   in that case the body is giving the yaml content, so we can access the "metadata"
   Therefore, what i see is the function needs to verify whether  the giving value ends with ".yaml".. which means a path for a file then we can open it, otherwise,  work with the actual process


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org