You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Flo Rance <tr...@gmail.com> on 2019/03/20 15:30:39 UTC

PostgreSQL hook

Hi,

I don't know if it's the correct place to ask for that.

I'm trying to implement one of my cronjob using airflow. One of the tasks
is to load files in a temporary table and then update another table in a
postgres db.
For that, I was previously using a sql script like that:

BEGIN;

CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
NO DATA;

\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
ENCODING 'LATIN1' NULL '';

DELETE FROM catalog_tmp WHERE code IS NULL;
...
COMMIT;

I would like to replace \copy with the copy_expert from postgresql hook. Is
that realistic ?
If yes, how can I combine a sql script and that hook in one task ?

Regards,
Flo

Re: PostgreSQL hook

Posted by Jiajie Zhong <zh...@hotmail.com>.
Sorry for reply late, busy with job and my family.

Yes, you're right.
I review the `postgres_hook.py` and it inherit from `DbApiHook`
So each time your use `run` function will create new connection in
https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172>

apache/airflow<https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172>
Apache Airflow. Contribute to apache/airflow development by creating an account on GitHub.
github.com

In this sutiation, I personally sugguest you cursor in operator.

    def execute(self, context):
        pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
 --> here      cursor = self.hook.get_cursor()                   <--

        if self.pg_preoperator:
            self.log.info("Running Postgres preoperator")
            cursor.execute(self.pg_preoperator)

        self.log.info("loading file to pg table.")
        cursor.execute(sql=self.sql)

        if self.pg_postoperator:
            self.log.info("Running Postgres postoperator")
             cursor.execute(self.pg_postoperator)



Best wish.
-- Jiajie
________________________________
From: Flo Rance <tr...@gmail.com>
Sent: Tuesday, March 26, 2019 15:55
To: dev@airflow.apache.org
Subject: Re: PostgreSQL hook

Hi,

This is what I tried first, but it's not working.

pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
pg.run(...)

This part of code will always use a a new connection, and therefore the
temp table created in preoperator will not be accessible for the COPY
command.
For this to work, I had to use a cursor instead.

Flo

On Sat, Mar 23, 2019 at 10:22 AM Jiajie Zhong <zh...@hotmail.com>
wrote:

> I wrote a demo code here. maybe it not work but I think the idea is right.
>
>
> # create file name file_to_Pg.py
> from airflow.models import BaseOperator
> from airflow.hooks.postgres_hook import PostgresHook
>
> class FileToPgTransfer(BaseOperator):
>     def __init__(self, postgres_conn_id, pg_preoperator, sql,
> pg_postoperator, ... , *args, **kwargs):
>         # init here esle
>         self.postgres_conn_id = postgres_conn_id
>         self.pg_preoperator = pg_preoperator
>         self.sql = sql
>         self.pg_postoperator = pg_postoperator
>         # init here esle
>
>     def execute(self, context):
>         pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
>
>         if self.pg_preoperator:
>             self.log.info("Running Postgres preoperator")
>             pg.run(self.pg_preoperator)
>
>         self.log.info("loading file to pg table.")
>         pg.run(sql=self.sql)
>
>         if self.pg_postoperator:
>             self.log.info("Running Postgres postoperator")
>             pg.run(self.pg_postoperator)
>
>
> # you could use using below code in DAG file
> task = FileToPgTransfer(
>     postgres_conn_id='postgres_conn_id',
>     pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT
> * FROM catalog WITH NO DATA',
>     sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH
> DELIMITER ';' CSV ENCODING 'LATIN1' NULL ''",
>     pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL',
> )
>
>
> Best wish.
> -- jiajie
> ________________________________
> From: Flo Rance <tr...@gmail.com>
> Sent: Thursday, March 21, 2019 22:31
> To: dev@airflow.apache.org
> Subject: Re: PostgreSQL hook
>
> I've found some more information that seems to confirm my suspicion.
>
> The connection is not persistent between the pg_preoperator step and the
> copy_expert one.
>
> There's a suggestion to make persistence a property of the connection:
>
> https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection
>
> I would be very grateful if someone could help me implement that in my
> operator that uses PostgreSQL Hook.
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <tr...@gmail.com> wrote:
>
> > No problem.
> >
> > Thanks for the link, I was able to create a plugin and an operator that
> do
> > almost what I want.
> >
> > My only issue is regarding the temp table, because it's not available
> when
> > I call copy_expert. So it seems to me that's not the same session as the
> > one that created the temp table previously, because if I use a standard
> > table I don't have this issue.
> >
> > Does anyone have an idea how to fix this?
> >
> > Regards,
> > Flo
> >
> > On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zhongjiajie955@hotmail.com
> >
> > wrote:
> >
> >> Using Airflow plugins, maybe you should take a look at
> >> https://airflow.apache.org/plugins.html.
> >>
> >> BTW, sorry for send duplicate e-mail last night, due to my network
> failure
> >>
> >> Best wish.
> >> - jiajie
> >>
> >> ________________________________
> >>
> >> Hi,
> >>
> >> Thank you for this explanation. If I summarize, I'll have to write a
> >> file_to_postgres Operator, with pg_preoperator and pg_postoperator
> >> parameters.
> >>
> >> Just a simple question: Where should I add and store this Operator in
> the
> >> airflow ecosystem ?
> >>
> >> Regards,
> >> Flo
> >>
> >> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <
> zhongjiajie955@hotmail.com>
> >> wrote:
> >>
> >> > Hi, Flo. I am not good at PG, but I find code in out master branch
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> >> > <
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >> > >
> >> > I think maybe this is what you looking for.
> >> >
> >> > And, we recommend use Operator to do something instead of Hook. But in
> >> we
> >> > have no "local-file-pg-operator". maybe you should and this function
> by
> >> > youself.
> >> >
> >> > BWT, I think
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> > what you said is a transaction, and so do in a single operator. you
> >> could
> >> > write code just like
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> >
> >> > apache/airflow<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> > Apache Airflow. Contribute to apache/airflow development by creating
> an
> >> > account on GitHub.
> >> > github.com
> >> >
> >> > that have "pg_preoperator" and "pg_postoperator" parameter, but
> extract
> >> > data from local file instand of hive.
> >> >
> >> > ________________________________
> >> > From: Flo Rance <tr...@gmail.com>
> >> > Sent: Wednesday, March 20, 2019 23:30
> >> > To: dev@airflow.apache.org
> >> > Subject: PostgreSQL hook
> >> >
> >> > Hi,
> >> >
> >> > I don't know if it's the correct place to ask for that.
> >> >
> >> > I'm trying to implement one of my cronjob using airflow. One of the
> >> tasks
> >> > is to load files in a temporary table and then update another table
> in a
> >> > postgres db.
> >> > For that, I was previously using a sql script like that:
> >> >
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> >
> >> > I would like to replace \copy with the copy_expert from postgresql
> >> hook. Is
> >> > that realistic ?
> >> > If yes, how can I combine a sql script and that hook in one task ?
> >> >
> >> > Regards,
> >> > Flo
> >> >
> >>
> >
>

Re: PostgreSQL hook

Posted by Flo Rance <tr...@gmail.com>.
Hi,

This is what I tried first, but it's not working.

pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
pg.run(...)

This part of code will always use a a new connection, and therefore the
temp table created in preoperator will not be accessible for the COPY
command.
For this to work, I had to use a cursor instead.

Flo

On Sat, Mar 23, 2019 at 10:22 AM Jiajie Zhong <zh...@hotmail.com>
wrote:

> I wrote a demo code here. maybe it not work but I think the idea is right.
>
>
> # create file name file_to_Pg.py
> from airflow.models import BaseOperator
> from airflow.hooks.postgres_hook import PostgresHook
>
> class FileToPgTransfer(BaseOperator):
>     def __init__(self, postgres_conn_id, pg_preoperator, sql,
> pg_postoperator, ... , *args, **kwargs):
>         # init here esle
>         self.postgres_conn_id = postgres_conn_id
>         self.pg_preoperator = pg_preoperator
>         self.sql = sql
>         self.pg_postoperator = pg_postoperator
>         # init here esle
>
>     def execute(self, context):
>         pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
>
>         if self.pg_preoperator:
>             self.log.info("Running Postgres preoperator")
>             pg.run(self.pg_preoperator)
>
>         self.log.info("loading file to pg table.")
>         pg.run(sql=self.sql)
>
>         if self.pg_postoperator:
>             self.log.info("Running Postgres postoperator")
>             pg.run(self.pg_postoperator)
>
>
> # you could use using below code in DAG file
> task = FileToPgTransfer(
>     postgres_conn_id='postgres_conn_id',
>     pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT
> * FROM catalog WITH NO DATA',
>     sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH
> DELIMITER ';' CSV ENCODING 'LATIN1' NULL ''",
>     pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL',
> )
>
>
> Best wish.
> -- jiajie
> ________________________________
> From: Flo Rance <tr...@gmail.com>
> Sent: Thursday, March 21, 2019 22:31
> To: dev@airflow.apache.org
> Subject: Re: PostgreSQL hook
>
> I've found some more information that seems to confirm my suspicion.
>
> The connection is not persistent between the pg_preoperator step and the
> copy_expert one.
>
> There's a suggestion to make persistence a property of the connection:
>
> https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection
>
> I would be very grateful if someone could help me implement that in my
> operator that uses PostgreSQL Hook.
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <tr...@gmail.com> wrote:
>
> > No problem.
> >
> > Thanks for the link, I was able to create a plugin and an operator that
> do
> > almost what I want.
> >
> > My only issue is regarding the temp table, because it's not available
> when
> > I call copy_expert. So it seems to me that's not the same session as the
> > one that created the temp table previously, because if I use a standard
> > table I don't have this issue.
> >
> > Does anyone have an idea how to fix this?
> >
> > Regards,
> > Flo
> >
> > On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zhongjiajie955@hotmail.com
> >
> > wrote:
> >
> >> Using Airflow plugins, maybe you should take a look at
> >> https://airflow.apache.org/plugins.html.
> >>
> >> BTW, sorry for send duplicate e-mail last night, due to my network
> failure
> >>
> >> Best wish.
> >> - jiajie
> >>
> >> ________________________________
> >>
> >> Hi,
> >>
> >> Thank you for this explanation. If I summarize, I'll have to write a
> >> file_to_postgres Operator, with pg_preoperator and pg_postoperator
> >> parameters.
> >>
> >> Just a simple question: Where should I add and store this Operator in
> the
> >> airflow ecosystem ?
> >>
> >> Regards,
> >> Flo
> >>
> >> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <
> zhongjiajie955@hotmail.com>
> >> wrote:
> >>
> >> > Hi, Flo. I am not good at PG, but I find code in out master branch
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> >> > <
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >> > >
> >> > I think maybe this is what you looking for.
> >> >
> >> > And, we recommend use Operator to do something instead of Hook. But in
> >> we
> >> > have no "local-file-pg-operator". maybe you should and this function
> by
> >> > youself.
> >> >
> >> > BWT, I think
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> > what you said is a transaction, and so do in a single operator. you
> >> could
> >> > write code just like
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> >
> >> > apache/airflow<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> > Apache Airflow. Contribute to apache/airflow development by creating
> an
> >> > account on GitHub.
> >> > github.com
> >> >
> >> > that have "pg_preoperator" and "pg_postoperator" parameter, but
> extract
> >> > data from local file instand of hive.
> >> >
> >> > ________________________________
> >> > From: Flo Rance <tr...@gmail.com>
> >> > Sent: Wednesday, March 20, 2019 23:30
> >> > To: dev@airflow.apache.org
> >> > Subject: PostgreSQL hook
> >> >
> >> > Hi,
> >> >
> >> > I don't know if it's the correct place to ask for that.
> >> >
> >> > I'm trying to implement one of my cronjob using airflow. One of the
> >> tasks
> >> > is to load files in a temporary table and then update another table
> in a
> >> > postgres db.
> >> > For that, I was previously using a sql script like that:
> >> >
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> >
> >> > I would like to replace \copy with the copy_expert from postgresql
> >> hook. Is
> >> > that realistic ?
> >> > If yes, how can I combine a sql script and that hook in one task ?
> >> >
> >> > Regards,
> >> > Flo
> >> >
> >>
> >
>

Re: PostgreSQL hook

Posted by Jiajie Zhong <zh...@hotmail.com>.
I wrote a demo code here. maybe it not work but I think the idea is right.


# create file name file_to_Pg.py
from airflow.models import BaseOperator
from airflow.hooks.postgres_hook import PostgresHook

class FileToPgTransfer(BaseOperator):
    def __init__(self, postgres_conn_id, pg_preoperator, sql, pg_postoperator, ... , *args, **kwargs):
        # init here esle
        self.postgres_conn_id = postgres_conn_id
        self.pg_preoperator = pg_preoperator
        self.sql = sql
        self.pg_postoperator = pg_postoperator
        # init here esle

    def execute(self, context):
        pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)

        if self.pg_preoperator:
            self.log.info("Running Postgres preoperator")
            pg.run(self.pg_preoperator)

        self.log.info("loading file to pg table.")
        pg.run(sql=self.sql)

        if self.pg_postoperator:
            self.log.info("Running Postgres postoperator")
            pg.run(self.pg_postoperator)


# you could use using below code in DAG file
task = FileToPgTransfer(
    postgres_conn_id='postgres_conn_id',
    pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH NO DATA',
    sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV ENCODING 'LATIN1' NULL ''",
    pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL',
)


Best wish.
-- jiajie
________________________________
From: Flo Rance <tr...@gmail.com>
Sent: Thursday, March 21, 2019 22:31
To: dev@airflow.apache.org
Subject: Re: PostgreSQL hook

I've found some more information that seems to confirm my suspicion.

The connection is not persistent between the pg_preoperator step and the
copy_expert one.

There's a suggestion to make persistence a property of the connection:
https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection

I would be very grateful if someone could help me implement that in my
operator that uses PostgreSQL Hook.

Regards,
Flo

On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <tr...@gmail.com> wrote:

> No problem.
>
> Thanks for the link, I was able to create a plugin and an operator that do
> almost what I want.
>
> My only issue is regarding the temp table, because it's not available when
> I call copy_expert. So it seems to me that's not the same session as the
> one that created the temp table previously, because if I use a standard
> table I don't have this issue.
>
> Does anyone have an idea how to fix this?
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zh...@hotmail.com>
> wrote:
>
>> Using Airflow plugins, maybe you should take a look at
>> https://airflow.apache.org/plugins.html.
>>
>> BTW, sorry for send duplicate e-mail last night, due to my network failure
>>
>> Best wish.
>> - jiajie
>>
>> ________________________________
>>
>> Hi,
>>
>> Thank you for this explanation. If I summarize, I'll have to write a
>> file_to_postgres Operator, with pg_preoperator and pg_postoperator
>> parameters.
>>
>> Just a simple question: Where should I add and store this Operator in the
>> airflow ecosystem ?
>>
>> Regards,
>> Flo
>>
>> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
>> wrote:
>>
>> > Hi, Flo. I am not good at PG, but I find code in out master branch
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
>> > <
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
>> > >
>> > I think maybe this is what you looking for.
>> >
>> > And, we recommend use Operator to do something instead of Hook. But in
>> we
>> > have no "local-file-pg-operator". maybe you should and this function by
>> > youself.
>> >
>> > BWT, I think
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> > what you said is a transaction, and so do in a single operator. you
>> could
>> > write code just like
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> >
>> > apache/airflow<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> > Apache Airflow. Contribute to apache/airflow development by creating an
>> > account on GitHub.
>> > github.com
>> >
>> > that have "pg_preoperator" and "pg_postoperator" parameter, but extract
>> > data from local file instand of hive.
>> >
>> > ________________________________
>> > From: Flo Rance <tr...@gmail.com>
>> > Sent: Wednesday, March 20, 2019 23:30
>> > To: dev@airflow.apache.org
>> > Subject: PostgreSQL hook
>> >
>> > Hi,
>> >
>> > I don't know if it's the correct place to ask for that.
>> >
>> > I'm trying to implement one of my cronjob using airflow. One of the
>> tasks
>> > is to load files in a temporary table and then update another table in a
>> > postgres db.
>> > For that, I was previously using a sql script like that:
>> >
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> >
>> > I would like to replace \copy with the copy_expert from postgresql
>> hook. Is
>> > that realistic ?
>> > If yes, how can I combine a sql script and that hook in one task ?
>> >
>> > Regards,
>> > Flo
>> >
>>
>

Re: PostgreSQL hook

Posted by Flo Rance <tr...@gmail.com>.
I've found some more information that seems to confirm my suspicion.

The connection is not persistent between the pg_preoperator step and the
copy_expert one.

There's a suggestion to make persistence a property of the connection:
https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection

I would be very grateful if someone could help me implement that in my
operator that uses PostgreSQL Hook.

Regards,
Flo

On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <tr...@gmail.com> wrote:

> No problem.
>
> Thanks for the link, I was able to create a plugin and an operator that do
> almost what I want.
>
> My only issue is regarding the temp table, because it's not available when
> I call copy_expert. So it seems to me that's not the same session as the
> one that created the temp table previously, because if I use a standard
> table I don't have this issue.
>
> Does anyone have an idea how to fix this?
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zh...@hotmail.com>
> wrote:
>
>> Using Airflow plugins, maybe you should take a look at
>> https://airflow.apache.org/plugins.html.
>>
>> BTW, sorry for send duplicate e-mail last night, due to my network failure
>>
>> Best wish.
>> - jiajie
>>
>> ________________________________
>>
>> Hi,
>>
>> Thank you for this explanation. If I summarize, I'll have to write a
>> file_to_postgres Operator, with pg_preoperator and pg_postoperator
>> parameters.
>>
>> Just a simple question: Where should I add and store this Operator in the
>> airflow ecosystem ?
>>
>> Regards,
>> Flo
>>
>> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
>> wrote:
>>
>> > Hi, Flo. I am not good at PG, but I find code in out master branch
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
>> > <
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
>> > >
>> > I think maybe this is what you looking for.
>> >
>> > And, we recommend use Operator to do something instead of Hook. But in
>> we
>> > have no "local-file-pg-operator". maybe you should and this function by
>> > youself.
>> >
>> > BWT, I think
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> > what you said is a transaction, and so do in a single operator. you
>> could
>> > write code just like
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> >
>> > apache/airflow<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> > Apache Airflow. Contribute to apache/airflow development by creating an
>> > account on GitHub.
>> > github.com
>> >
>> > that have "pg_preoperator" and "pg_postoperator" parameter, but extract
>> > data from local file instand of hive.
>> >
>> > ________________________________
>> > From: Flo Rance <tr...@gmail.com>
>> > Sent: Wednesday, March 20, 2019 23:30
>> > To: dev@airflow.apache.org
>> > Subject: PostgreSQL hook
>> >
>> > Hi,
>> >
>> > I don't know if it's the correct place to ask for that.
>> >
>> > I'm trying to implement one of my cronjob using airflow. One of the
>> tasks
>> > is to load files in a temporary table and then update another table in a
>> > postgres db.
>> > For that, I was previously using a sql script like that:
>> >
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> >
>> > I would like to replace \copy with the copy_expert from postgresql
>> hook. Is
>> > that realistic ?
>> > If yes, how can I combine a sql script and that hook in one task ?
>> >
>> > Regards,
>> > Flo
>> >
>>
>

Re: PostgreSQL hook

Posted by Flo Rance <tr...@gmail.com>.
No problem.

Thanks for the link, I was able to create a plugin and an operator that do
almost what I want.

My only issue is regarding the temp table, because it's not available when
I call copy_expert. So it seems to me that's not the same session as the
one that created the temp table previously, because if I use a standard
table I don't have this issue.

Does anyone have an idea how to fix this?

Regards,
Flo

On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zh...@hotmail.com>
wrote:

> Using Airflow plugins, maybe you should take a look at
> https://airflow.apache.org/plugins.html.
>
> BTW, sorry for send duplicate e-mail last night, due to my network failure
>
> Best wish.
> - jiajie
>
> ________________________________
>
> Hi,
>
> Thank you for this explanation. If I summarize, I'll have to write a
> file_to_postgres Operator, with pg_preoperator and pg_postoperator
> parameters.
>
> Just a simple question: Where should I add and store this Operator in the
> airflow ecosystem ?
>
> Regards,
> Flo
>
> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
> wrote:
>
> > Hi, Flo. I am not good at PG, but I find code in out master branch
> >
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> > <
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> > >
> > I think maybe this is what you looking for.
> >
> > And, we recommend use Operator to do something instead of Hook. But in we
> > have no "local-file-pg-operator". maybe you should and this function by
> > youself.
> >
> > BWT, I think
> > BEGIN;
> >
> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> WITH
> > NO DATA;
> >
> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
> CSV
> > ENCODING 'LATIN1' NULL '';
> >
> > DELETE FROM catalog_tmp WHERE code IS NULL;
> > ...
> > COMMIT;
> > what you said is a transaction, and so do in a single operator. you could
> > write code just like
> >
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> > >
> >
> > apache/airflow<
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> > >
> > Apache Airflow. Contribute to apache/airflow development by creating an
> > account on GitHub.
> > github.com
> >
> > that have "pg_preoperator" and "pg_postoperator" parameter, but extract
> > data from local file instand of hive.
> >
> > ________________________________
> > From: Flo Rance <tr...@gmail.com>
> > Sent: Wednesday, March 20, 2019 23:30
> > To: dev@airflow.apache.org
> > Subject: PostgreSQL hook
> >
> > Hi,
> >
> > I don't know if it's the correct place to ask for that.
> >
> > I'm trying to implement one of my cronjob using airflow. One of the tasks
> > is to load files in a temporary table and then update another table in a
> > postgres db.
> > For that, I was previously using a sql script like that:
> >
> > BEGIN;
> >
> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> WITH
> > NO DATA;
> >
> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
> CSV
> > ENCODING 'LATIN1' NULL '';
> >
> > DELETE FROM catalog_tmp WHERE code IS NULL;
> > ...
> > COMMIT;
> >
> > I would like to replace \copy with the copy_expert from postgresql hook.
> Is
> > that realistic ?
> > If yes, how can I combine a sql script and that hook in one task ?
> >
> > Regards,
> > Flo
> >
>

Re: PostgreSQL hook

Posted by jiajie zhong <zh...@hotmail.com>.
Using Airflow plugins, maybe you should take a look at https://airflow.apache.org/plugins.html.

BTW, sorry for send duplicate e-mail last night, due to my network failure

Best wish.
- jiajie

________________________________

Hi,

Thank you for this explanation. If I summarize, I'll have to write a
file_to_postgres Operator, with pg_preoperator and pg_postoperator
parameters.

Just a simple question: Where should I add and store this Operator in the
airflow ecosystem ?

Regards,
Flo

On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
wrote:

> Hi, Flo. I am not good at PG, but I find code in out master branch
>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> <
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >
> I think maybe this is what you looking for.
>
> And, we recommend use Operator to do something instead of Hook. But in we
> have no "local-file-pg-operator". maybe you should and this function by
> youself.
>
> BWT, I think
> BEGIN;
>
> CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
> NO DATA;
>
> \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
> ENCODING 'LATIN1' NULL '';
>
> DELETE FROM catalog_tmp WHERE code IS NULL;
> ...
> COMMIT;
> what you said is a transaction, and so do in a single operator. you could
> write code just like
>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >
>
> apache/airflow<
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >
> Apache Airflow. Contribute to apache/airflow development by creating an
> account on GitHub.
> github.com
>
> that have "pg_preoperator" and "pg_postoperator" parameter, but extract
> data from local file instand of hive.
>
> ________________________________
> From: Flo Rance <tr...@gmail.com>
> Sent: Wednesday, March 20, 2019 23:30
> To: dev@airflow.apache.org
> Subject: PostgreSQL hook
>
> Hi,
>
> I don't know if it's the correct place to ask for that.
>
> I'm trying to implement one of my cronjob using airflow. One of the tasks
> is to load files in a temporary table and then update another table in a
> postgres db.
> For that, I was previously using a sql script like that:
>
> BEGIN;
>
> CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
> NO DATA;
>
> \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
> ENCODING 'LATIN1' NULL '';
>
> DELETE FROM catalog_tmp WHERE code IS NULL;
> ...
> COMMIT;
>
> I would like to replace \copy with the copy_expert from postgresql hook. Is
> that realistic ?
> If yes, how can I combine a sql script and that hook in one task ?
>
> Regards,
> Flo
>

Re: PostgreSQL hook

Posted by Flo Rance <tr...@gmail.com>.
Hi,

Thank you for this explanation. If I summarize, I'll have to write a
file_to_postgres Operator, with pg_preoperator and pg_postoperator
parameters.

Just a simple question: Where should I add and store this Operator in the
airflow ecosystem ?

Regards,
Flo

On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
wrote:

> Hi, Flo. I am not good at PG, but I find code in out master branch
>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> <
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >
> I think maybe this is what you looking for.
>
> And, we recommend use Operator to do something instead of Hook. But in we
> have no "local-file-pg-operator". maybe you should and this function by
> youself.
>
> BWT, I think
> BEGIN;
>
> CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
> NO DATA;
>
> \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
> ENCODING 'LATIN1' NULL '';
>
> DELETE FROM catalog_tmp WHERE code IS NULL;
> ...
> COMMIT;
> what you said is a transaction, and so do in a single operator. you could
> write code just like
>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >
>
> apache/airflow<
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >
> Apache Airflow. Contribute to apache/airflow development by creating an
> account on GitHub.
> github.com
>
> that have "pg_preoperator" and "pg_postoperator" parameter, but extract
> data from local file instand of hive.
>
> ________________________________
> From: Flo Rance <tr...@gmail.com>
> Sent: Wednesday, March 20, 2019 23:30
> To: dev@airflow.apache.org
> Subject: PostgreSQL hook
>
> Hi,
>
> I don't know if it's the correct place to ask for that.
>
> I'm trying to implement one of my cronjob using airflow. One of the tasks
> is to load files in a temporary table and then update another table in a
> postgres db.
> For that, I was previously using a sql script like that:
>
> BEGIN;
>
> CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
> NO DATA;
>
> \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
> ENCODING 'LATIN1' NULL '';
>
> DELETE FROM catalog_tmp WHERE code IS NULL;
> ...
> COMMIT;
>
> I would like to replace \copy with the copy_expert from postgresql hook. Is
> that realistic ?
> If yes, how can I combine a sql script and that hook in one task ?
>
> Regards,
> Flo
>

Re: PostgreSQL hook

Posted by jiajie zhong <zh...@hotmail.com>.
Hi, Flo. I am not good at PG, but I find code in out master branch
https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89<https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83>
I think maybe this is what you looking for.

And, we recommend use Operator to do something instead of Hook. But in we have no "local-file-pg-operator". maybe you should and this function by youself.

BWT, I think
BEGIN;

CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
NO DATA;

\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
ENCODING 'LATIN1' NULL '';

DELETE FROM catalog_tmp WHERE code IS NULL;
...
COMMIT;
what you said is a transaction, and so do in a single operator. you could write code just like
https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71>

apache/airflow<https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71>
Apache Airflow. Contribute to apache/airflow development by creating an account on GitHub.
github.com

that have "pg_preoperator" and "pg_postoperator" parameter, but extract data from local file instand of hive.

________________________________
From: Flo Rance <tr...@gmail.com>
Sent: Wednesday, March 20, 2019 23:30
To: dev@airflow.apache.org
Subject: PostgreSQL hook

Hi,

I don't know if it's the correct place to ask for that.

I'm trying to implement one of my cronjob using airflow. One of the tasks
is to load files in a temporary table and then update another table in a
postgres db.
For that, I was previously using a sql script like that:

BEGIN;

CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
NO DATA;

\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
ENCODING 'LATIN1' NULL '';

DELETE FROM catalog_tmp WHERE code IS NULL;
...
COMMIT;

I would like to replace \copy with the copy_expert from postgresql hook. Is
that realistic ?
If yes, how can I combine a sql script and that hook in one task ?

Regards,
Flo