You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by al...@gmail.com, al...@gmail.com on 2018/05/16 01:21:16 UTC

How Airflow import modules as it executes the tasks

To start off, here is my project structure:
├── dags
│   ├── __init__.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── operators
│   │   │   ├── __init__.py
│   │   │   ├── first_operator.py
│   │   └── util
│   │       ├── __init__.py
│   │       ├── db.py
│   ├── my_dag.py

Here is the versions and details of the airflow docker setup:

In my dag in different tasks I'm connecting to db (not Airflow db). I've setup db connection pooling,  I expected that my db.py would be be loaded once across the DagRun. However, in the log I can see that each task imports the module and new db connections made by each and every task. I can see that db.py is loaded in each task by having the line below in db.py:

logging.info("I was loaded {}".format(random.randint(0,100)))

I understand that each operator can technically be run in a separate machine and it does make sense that each task runs sort of independently. However, not sure that if this does apply in case of using LocalExecutor. Now the question is, how I can share the resources (db connections) across tasks using LocalExecutor.

答复: How Airflow import modules as it executes the tasks

Posted by Song Liu <so...@outlook.com>.
I think there might be two ways:

1. Setup the connections via. the Airflow UI: http://airflow.readthedocs.io/en/latest/configuration.html#connections, I guess this could be done in your code also.

2. Put your connection setup into a operator at the begin of your dag

________________________________
发件人: alireza.khoshkbari@ <gmail.com alireza.khoshkbari@gmail.com>
发送时间: 2018年5月16日 1:21
收件人: dev@airflow.apache.org
主题: How Airflow import modules as it executes the tasks

To start off, here is my project structure:
├── dags
│   ├── __init__.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── operators
│   │   │   ├── __init__.py
│   │   │   ├── first_operator.py
│   │   └── util
│   │       ├── __init__.py
│   │       ├── db.py
│   ├── my_dag.py

Here is the versions and details of the airflow docker setup:

In my dag in different tasks I'm connecting to db (not Airflow db). I've setup db connection pooling,  I expected that my db.py would be be loaded once across the DagRun. However, in the log I can see that each task imports the module and new db connections made by each and every task. I can see that db.py is loaded in each task by having the line below in db.py:

logging.info("I was loaded {}".format(random.randint(0,100)))

I understand that each operator can technically be run in a separate machine and it does make sense that each task runs sort of independently. However, not sure that if this does apply in case of using LocalExecutor. Now the question is, how I can share the resources (db connections) across tasks using LocalExecutor.

Re: How Airflow import modules as it executes the tasks

Posted by al...@gmail.com, al...@gmail.com.
Thanks for the explanation, really helpful.

Cheers,
Ali

On 2018/05/16 03:27:27, Ruiqin Yang <yr...@gmail.com> wrote: 
> You are right, but that's within the same process. The way each operator
> gets executed is that one `airflow run` command get generated and sent to
> the local executor, executor spun up subprocesses to run `airflow run
> --raw` (which parses the file again and calls the operator.execute()). Thus
> each task would have its own process that parses the *.py file and import
> the module multiple times.
> 
> Hope this helped, cheers
> Kevin Y
> 
> On Tue, May 15, 2018 at 7:57 PM, alireza.khoshkbari@gmail.com <
> alireza.khoshkbari@gmail.com> wrote:
> 
> > Thanks Kevin. Yes, I'm importing db in different operators. That said, my
> > understanding is if a module has already been imported, it's not loaded
> > again even if you try to import it again (and I reckon this is why in
> > Python Singleton is not commonly used). Is that right?
> >
> > On 2018/05/16 02:34:18, Ruiqin Yang <yr...@gmail.com> wrote:
> > > Not exactly answering your question but the reason db.py is loaded in
> > each
> > > task might be because you have something like `import db` in each of your
> > > *.py file, and Airflow spun up one process to parse one *.py file, thus
> > > your db.py was loaded multiple time.
> > >
> > > I'm not sure how you can share the connection pool if it is created
> > within
> > > the same process your operator is in, since Airflow would spun up one
> > > process for each task even it is LocalExecutor. You might have to make
> > the
> > > connection pool available to outside processes (this part Idk how it can
> > be
> > > done) to be able to share it.
> > >
> > > Cheers,
> > > Kevin Y
> > >
> > > On Tue, May 15, 2018 at 6:21 PM, alireza.khoshkbari@gmail.com <
> > > alireza.khoshkbari@gmail.com> wrote:
> > >
> > > > To start off, here is my project structure:
> > > > ├── dags
> > > > │   ├── __init__.py
> > > > │   ├── core
> > > > │   │   ├── __init__.py
> > > > │   │   ├── operators
> > > > │   │   │   ├── __init__.py
> > > > │   │   │   ├── first_operator.py
> > > > │   │   └── util
> > > > │   │       ├── __init__.py
> > > > │   │       ├── db.py
> > > > │   ├── my_dag.py
> > > >
> > > > Here is the versions and details of the airflow docker setup:
> > > >
> > > > In my dag in different tasks I'm connecting to db (not Airflow db).
> > I've
> > > > setup db connection pooling,  I expected that my db.py would be be
> > loaded
> > > > once across the DagRun. However, in the log I can see that each task
> > > > imports the module and new db connections made by each and every task.
> > I
> > > > can see that db.py is loaded in each task by having the line below in
> > db.py:
> > > >
> > > > logging.info("I was loaded {}".format(random.randint(0,100)))
> > > >
> > > > I understand that each operator can technically be run in a separate
> > > > machine and it does make sense that each task runs sort of
> > independently.
> > > > However, not sure that if this does apply in case of using
> > LocalExecutor.
> > > > Now the question is, how I can share the resources (db connections)
> > across
> > > > tasks using LocalExecutor.
> > > >
> > >
> >
> 

Re: How Airflow import modules as it executes the tasks

Posted by Ruiqin Yang <yr...@gmail.com>.
You are right, but that's within the same process. The way each operator
gets executed is that one `airflow run` command get generated and sent to
the local executor, executor spun up subprocesses to run `airflow run
--raw` (which parses the file again and calls the operator.execute()). Thus
each task would have its own process that parses the *.py file and import
the module multiple times.

Hope this helped, cheers
Kevin Y

On Tue, May 15, 2018 at 7:57 PM, alireza.khoshkbari@gmail.com <
alireza.khoshkbari@gmail.com> wrote:

> Thanks Kevin. Yes, I'm importing db in different operators. That said, my
> understanding is if a module has already been imported, it's not loaded
> again even if you try to import it again (and I reckon this is why in
> Python Singleton is not commonly used). Is that right?
>
> On 2018/05/16 02:34:18, Ruiqin Yang <yr...@gmail.com> wrote:
> > Not exactly answering your question but the reason db.py is loaded in
> each
> > task might be because you have something like `import db` in each of your
> > *.py file, and Airflow spun up one process to parse one *.py file, thus
> > your db.py was loaded multiple time.
> >
> > I'm not sure how you can share the connection pool if it is created
> within
> > the same process your operator is in, since Airflow would spun up one
> > process for each task even it is LocalExecutor. You might have to make
> the
> > connection pool available to outside processes (this part Idk how it can
> be
> > done) to be able to share it.
> >
> > Cheers,
> > Kevin Y
> >
> > On Tue, May 15, 2018 at 6:21 PM, alireza.khoshkbari@gmail.com <
> > alireza.khoshkbari@gmail.com> wrote:
> >
> > > To start off, here is my project structure:
> > > ├── dags
> > > │   ├── __init__.py
> > > │   ├── core
> > > │   │   ├── __init__.py
> > > │   │   ├── operators
> > > │   │   │   ├── __init__.py
> > > │   │   │   ├── first_operator.py
> > > │   │   └── util
> > > │   │       ├── __init__.py
> > > │   │       ├── db.py
> > > │   ├── my_dag.py
> > >
> > > Here is the versions and details of the airflow docker setup:
> > >
> > > In my dag in different tasks I'm connecting to db (not Airflow db).
> I've
> > > setup db connection pooling,  I expected that my db.py would be be
> loaded
> > > once across the DagRun. However, in the log I can see that each task
> > > imports the module and new db connections made by each and every task.
> I
> > > can see that db.py is loaded in each task by having the line below in
> db.py:
> > >
> > > logging.info("I was loaded {}".format(random.randint(0,100)))
> > >
> > > I understand that each operator can technically be run in a separate
> > > machine and it does make sense that each task runs sort of
> independently.
> > > However, not sure that if this does apply in case of using
> LocalExecutor.
> > > Now the question is, how I can share the resources (db connections)
> across
> > > tasks using LocalExecutor.
> > >
> >
>

Re: How Airflow import modules as it executes the tasks

Posted by al...@gmail.com, al...@gmail.com.
Thanks Kevin. Yes, I'm importing db in different operators. That said, my understanding is if a module has already been imported, it's not loaded again even if you try to import it again (and I reckon this is why in Python Singleton is not commonly used). Is that right?

On 2018/05/16 02:34:18, Ruiqin Yang <yr...@gmail.com> wrote: 
> Not exactly answering your question but the reason db.py is loaded in each
> task might be because you have something like `import db` in each of your
> *.py file, and Airflow spun up one process to parse one *.py file, thus
> your db.py was loaded multiple time.
> 
> I'm not sure how you can share the connection pool if it is created within
> the same process your operator is in, since Airflow would spun up one
> process for each task even it is LocalExecutor. You might have to make the
> connection pool available to outside processes (this part Idk how it can be
> done) to be able to share it.
> 
> Cheers,
> Kevin Y
> 
> On Tue, May 15, 2018 at 6:21 PM, alireza.khoshkbari@gmail.com <
> alireza.khoshkbari@gmail.com> wrote:
> 
> > To start off, here is my project structure:
> > ├── dags
> > │   ├── __init__.py
> > │   ├── core
> > │   │   ├── __init__.py
> > │   │   ├── operators
> > │   │   │   ├── __init__.py
> > │   │   │   ├── first_operator.py
> > │   │   └── util
> > │   │       ├── __init__.py
> > │   │       ├── db.py
> > │   ├── my_dag.py
> >
> > Here is the versions and details of the airflow docker setup:
> >
> > In my dag in different tasks I'm connecting to db (not Airflow db). I've
> > setup db connection pooling,  I expected that my db.py would be be loaded
> > once across the DagRun. However, in the log I can see that each task
> > imports the module and new db connections made by each and every task. I
> > can see that db.py is loaded in each task by having the line below in db.py:
> >
> > logging.info("I was loaded {}".format(random.randint(0,100)))
> >
> > I understand that each operator can technically be run in a separate
> > machine and it does make sense that each task runs sort of independently.
> > However, not sure that if this does apply in case of using LocalExecutor.
> > Now the question is, how I can share the resources (db connections) across
> > tasks using LocalExecutor.
> >
> 

Re: How Airflow import modules as it executes the tasks

Posted by Ruiqin Yang <yr...@gmail.com>.
Not exactly answering your question but the reason db.py is loaded in each
task might be because you have something like `import db` in each of your
*.py file, and Airflow spun up one process to parse one *.py file, thus
your db.py was loaded multiple time.

I'm not sure how you can share the connection pool if it is created within
the same process your operator is in, since Airflow would spun up one
process for each task even it is LocalExecutor. You might have to make the
connection pool available to outside processes (this part Idk how it can be
done) to be able to share it.

Cheers,
Kevin Y

On Tue, May 15, 2018 at 6:21 PM, alireza.khoshkbari@gmail.com <
alireza.khoshkbari@gmail.com> wrote:

> To start off, here is my project structure:
> ├── dags
> │   ├── __init__.py
> │   ├── core
> │   │   ├── __init__.py
> │   │   ├── operators
> │   │   │   ├── __init__.py
> │   │   │   ├── first_operator.py
> │   │   └── util
> │   │       ├── __init__.py
> │   │       ├── db.py
> │   ├── my_dag.py
>
> Here is the versions and details of the airflow docker setup:
>
> In my dag in different tasks I'm connecting to db (not Airflow db). I've
> setup db connection pooling,  I expected that my db.py would be be loaded
> once across the DagRun. However, in the log I can see that each task
> imports the module and new db connections made by each and every task. I
> can see that db.py is loaded in each task by having the line below in db.py:
>
> logging.info("I was loaded {}".format(random.randint(0,100)))
>
> I understand that each operator can technically be run in a separate
> machine and it does make sense that each task runs sort of independently.
> However, not sure that if this does apply in case of using LocalExecutor.
> Now the question is, how I can share the resources (db connections) across
> tasks using LocalExecutor.
>