You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "Ash Berlin-Taylor (Jira)" <ji...@apache.org> on 2021/05/05 08:31:00 UTC

[jira] [Closed] (AIRFLOW-2058) Scheduler uses MainThread for DAG file processing

     [ https://issues.apache.org/jira/browse/AIRFLOW-2058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ash Berlin-Taylor closed AIRFLOW-2058.
--------------------------------------
    Resolution: Fixed

Not true anymore in 1.10.4+

> Scheduler uses MainThread for DAG file processing
> -------------------------------------------------
>
>                 Key: AIRFLOW-2058
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2058
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DAG
>    Affects Versions: 1.9.0
>         Environment: Ubuntu, Airflow 1.9, Sequential executor
>            Reporter: Yang Pan
>            Assignee: Yang Pan
>            Priority: Blocker
>
> By reading the [source code |https://github.com/apache/incubator-airflow/blob/61ff29e578d1121ab4606fe122fb4e2db8f075b9/airflow/utils/dag_processing.py#L538] it appears the scheduler will process each DAG file, either a .py or .zip, using a new process. 
>  
> If I understand correctly, in theory what should happen in terms of processing a .zip file is that the dedicated process will add the .zip file to the PYTHONPATH, and load the file's module and dependency. When the DAG read is done, the process gets destroyed. And since the PYTHONPATH is process scoped, it won't pollute other processes.
>  
> However by printing out the threads and process id, it looks like Airflow scheduler can sometimes accidentally pick up the main process instead of creating a new one, and that's when collision happens.
>  
> Here is snippet of the PYTHONPATH when advanced_dag_dependency-1.zip is being processed. As you can see when it's executed by MainThread, it contains other .zip files. When it's using dedicated thread, only required .zip is added.
>  
> sys.path :['/root/airflow/dags/yang_subdag_2.zip', '/root/airflow/dags/yang_subdag_2.zip', '/root/airflow/dags/yang_subdag_1.zip', '/root/airflow/dags/yang_subdag_1.zip', '/root/airflow/dags/advanced_dag_dependency-2.zip', '/root/airflow/dags/advanced_dag_dependency-2.zip', '/root/airflow/dags/advanced_dag_dependency-1.zip', '/root/airflow/dags/advanced_dag_dependency-1.zip', '/root/airflow/dags/yang_subdag_1', '/usr/local/bin', '/usr/lib/python2.7', '/usr/lib/python2.7/plat-x86_64-linux-gnu', '/usr/lib/python2.7/lib-tk', '/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', '/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages/PILcompat', '/root/airflow/config', '/root/airflow/dags', '/root/airflow/plugins'] 
> Print from MyFirstOperator in Dag 1 
> process id: 5059 
> thread id: <_MainThread(*MainThread*, started 140339858560768)> 
>  
> sys.path :[u'/root/airflow/dags/advanced_dag_dependency-1.zip', '/usr/local/bin', '/usr/lib/python2.7', '/usr/lib/python2.7/plat-x86_64-linux-gnu', '/usr/lib/python2.7/lib-tk', '/usr/lib/python2.7/lib-old', '/usr/lib/python2.7/lib-dynload', '/usr/local/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages', '/usr/lib/python2.7/dist-packages/PILcompat', '/root/airflow/config', '/root/airflow/dags', '/root/airflow/plugins'] 
> Print from MyFirstOperator in Dag 1 
> process id: 5076 
> thread id: <_MainThread(*DagFileProcessor283*, started 140137838294784)> 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)