You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@airflow.apache.org by Flo Rance <tr...@gmail.com> on 2019/03/20 15:30:39 UTC
PostgreSQL hook
Hi,
I don't know if it's the correct place to ask for that.
I'm trying to implement one of my cronjob using airflow. One of the tasks
is to load files in a temporary table and then update another table in a
postgres db.
For that, I was previously using a sql script like that:
BEGIN;
CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
NO DATA;
\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
ENCODING 'LATIN1' NULL '';
DELETE FROM catalog_tmp WHERE code IS NULL;
...
COMMIT;
I would like to replace \copy with the copy_expert from postgresql hook. Is
that realistic ?
If yes, how can I combine a sql script and that hook in one task ?
Regards,
Flo
Re: PostgreSQL hook
Posted by Jiajie Zhong <zh...@hotmail.com>.
Sorry for reply late, busy with job and my family.
Yes, you're right.
I review the `postgres_hook.py` and it inherit from `DbApiHook`
So each time your use `run` function will create new connection in
https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172>
apache/airflow<https://github.com/apache/airflow/blob/75c633e70fdc2537a0112939a52666a5c0c2e114/airflow/hooks/dbapi_hook.py#L159-L172>
Apache Airflow. Contribute to apache/airflow development by creating an account on GitHub.
github.com
In this sutiation, I personally sugguest you cursor in operator.
def execute(self, context):
pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
--> here cursor = self.hook.get_cursor() <--
if self.pg_preoperator:
self.log.info("Running Postgres preoperator")
cursor.execute(self.pg_preoperator)
self.log.info("loading file to pg table.")
cursor.execute(sql=self.sql)
if self.pg_postoperator:
self.log.info("Running Postgres postoperator")
cursor.execute(self.pg_postoperator)
Best wish.
-- Jiajie
________________________________
From: Flo Rance <tr...@gmail.com>
Sent: Tuesday, March 26, 2019 15:55
To: dev@airflow.apache.org
Subject: Re: PostgreSQL hook
Hi,
This is what I tried first, but it's not working.
pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
pg.run(...)
This part of code will always use a a new connection, and therefore the
temp table created in preoperator will not be accessible for the COPY
command.
For this to work, I had to use a cursor instead.
Flo
On Sat, Mar 23, 2019 at 10:22 AM Jiajie Zhong <zh...@hotmail.com>
wrote:
> I wrote a demo code here. maybe it not work but I think the idea is right.
>
>
> # create file name file_to_Pg.py
> from airflow.models import BaseOperator
> from airflow.hooks.postgres_hook import PostgresHook
>
> class FileToPgTransfer(BaseOperator):
> def __init__(self, postgres_conn_id, pg_preoperator, sql,
> pg_postoperator, ... , *args, **kwargs):
> # init here esle
> self.postgres_conn_id = postgres_conn_id
> self.pg_preoperator = pg_preoperator
> self.sql = sql
> self.pg_postoperator = pg_postoperator
> # init here esle
>
> def execute(self, context):
> pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
>
> if self.pg_preoperator:
> self.log.info("Running Postgres preoperator")
> pg.run(self.pg_preoperator)
>
> self.log.info("loading file to pg table.")
> pg.run(sql=self.sql)
>
> if self.pg_postoperator:
> self.log.info("Running Postgres postoperator")
> pg.run(self.pg_postoperator)
>
>
> # you could use using below code in DAG file
> task = FileToPgTransfer(
> postgres_conn_id='postgres_conn_id',
> pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT
> * FROM catalog WITH NO DATA',
> sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH
> DELIMITER ';' CSV ENCODING 'LATIN1' NULL ''",
> pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL',
> )
>
>
> Best wish.
> -- jiajie
> ________________________________
> From: Flo Rance <tr...@gmail.com>
> Sent: Thursday, March 21, 2019 22:31
> To: dev@airflow.apache.org
> Subject: Re: PostgreSQL hook
>
> I've found some more information that seems to confirm my suspicion.
>
> The connection is not persistent between the pg_preoperator step and the
> copy_expert one.
>
> There's a suggestion to make persistence a property of the connection:
>
> https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection
>
> I would be very grateful if someone could help me implement that in my
> operator that uses PostgreSQL Hook.
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <tr...@gmail.com> wrote:
>
> > No problem.
> >
> > Thanks for the link, I was able to create a plugin and an operator that
> do
> > almost what I want.
> >
> > My only issue is regarding the temp table, because it's not available
> when
> > I call copy_expert. So it seems to me that's not the same session as the
> > one that created the temp table previously, because if I use a standard
> > table I don't have this issue.
> >
> > Does anyone have an idea how to fix this?
> >
> > Regards,
> > Flo
> >
> > On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zhongjiajie955@hotmail.com
> >
> > wrote:
> >
> >> Using Airflow plugins, maybe you should take a look at
> >> https://airflow.apache.org/plugins.html.
> >>
> >> BTW, sorry for send duplicate e-mail last night, due to my network
> failure
> >>
> >> Best wish.
> >> - jiajie
> >>
> >> ________________________________
> >>
> >> Hi,
> >>
> >> Thank you for this explanation. If I summarize, I'll have to write a
> >> file_to_postgres Operator, with pg_preoperator and pg_postoperator
> >> parameters.
> >>
> >> Just a simple question: Where should I add and store this Operator in
> the
> >> airflow ecosystem ?
> >>
> >> Regards,
> >> Flo
> >>
> >> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <
> zhongjiajie955@hotmail.com>
> >> wrote:
> >>
> >> > Hi, Flo. I am not good at PG, but I find code in out master branch
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> >> > <
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >> > >
> >> > I think maybe this is what you looking for.
> >> >
> >> > And, we recommend use Operator to do something instead of Hook. But in
> >> we
> >> > have no "local-file-pg-operator". maybe you should and this function
> by
> >> > youself.
> >> >
> >> > BWT, I think
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> > what you said is a transaction, and so do in a single operator. you
> >> could
> >> > write code just like
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> >
> >> > apache/airflow<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> > Apache Airflow. Contribute to apache/airflow development by creating
> an
> >> > account on GitHub.
> >> > github.com
> >> >
> >> > that have "pg_preoperator" and "pg_postoperator" parameter, but
> extract
> >> > data from local file instand of hive.
> >> >
> >> > ________________________________
> >> > From: Flo Rance <tr...@gmail.com>
> >> > Sent: Wednesday, March 20, 2019 23:30
> >> > To: dev@airflow.apache.org
> >> > Subject: PostgreSQL hook
> >> >
> >> > Hi,
> >> >
> >> > I don't know if it's the correct place to ask for that.
> >> >
> >> > I'm trying to implement one of my cronjob using airflow. One of the
> >> tasks
> >> > is to load files in a temporary table and then update another table
> in a
> >> > postgres db.
> >> > For that, I was previously using a sql script like that:
> >> >
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> >
> >> > I would like to replace \copy with the copy_expert from postgresql
> >> hook. Is
> >> > that realistic ?
> >> > If yes, how can I combine a sql script and that hook in one task ?
> >> >
> >> > Regards,
> >> > Flo
> >> >
> >>
> >
>
Re: PostgreSQL hook
Posted by Flo Rance <tr...@gmail.com>.
Hi,
This is what I tried first, but it's not working.
pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
pg.run(...)
This part of code will always use a a new connection, and therefore the
temp table created in preoperator will not be accessible for the COPY
command.
For this to work, I had to use a cursor instead.
Flo
On Sat, Mar 23, 2019 at 10:22 AM Jiajie Zhong <zh...@hotmail.com>
wrote:
> I wrote a demo code here. maybe it not work but I think the idea is right.
>
>
> # create file name file_to_Pg.py
> from airflow.models import BaseOperator
> from airflow.hooks.postgres_hook import PostgresHook
>
> class FileToPgTransfer(BaseOperator):
> def __init__(self, postgres_conn_id, pg_preoperator, sql,
> pg_postoperator, ... , *args, **kwargs):
> # init here esle
> self.postgres_conn_id = postgres_conn_id
> self.pg_preoperator = pg_preoperator
> self.sql = sql
> self.pg_postoperator = pg_postoperator
> # init here esle
>
> def execute(self, context):
> pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
>
> if self.pg_preoperator:
> self.log.info("Running Postgres preoperator")
> pg.run(self.pg_preoperator)
>
> self.log.info("loading file to pg table.")
> pg.run(sql=self.sql)
>
> if self.pg_postoperator:
> self.log.info("Running Postgres postoperator")
> pg.run(self.pg_postoperator)
>
>
> # you could use using below code in DAG file
> task = FileToPgTransfer(
> postgres_conn_id='postgres_conn_id',
> pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT
> * FROM catalog WITH NO DATA',
> sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH
> DELIMITER ';' CSV ENCODING 'LATIN1' NULL ''",
> pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL',
> )
>
>
> Best wish.
> -- jiajie
> ________________________________
> From: Flo Rance <tr...@gmail.com>
> Sent: Thursday, March 21, 2019 22:31
> To: dev@airflow.apache.org
> Subject: Re: PostgreSQL hook
>
> I've found some more information that seems to confirm my suspicion.
>
> The connection is not persistent between the pg_preoperator step and the
> copy_expert one.
>
> There's a suggestion to make persistence a property of the connection:
>
> https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection
>
> I would be very grateful if someone could help me implement that in my
> operator that uses PostgreSQL Hook.
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <tr...@gmail.com> wrote:
>
> > No problem.
> >
> > Thanks for the link, I was able to create a plugin and an operator that
> do
> > almost what I want.
> >
> > My only issue is regarding the temp table, because it's not available
> when
> > I call copy_expert. So it seems to me that's not the same session as the
> > one that created the temp table previously, because if I use a standard
> > table I don't have this issue.
> >
> > Does anyone have an idea how to fix this?
> >
> > Regards,
> > Flo
> >
> > On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zhongjiajie955@hotmail.com
> >
> > wrote:
> >
> >> Using Airflow plugins, maybe you should take a look at
> >> https://airflow.apache.org/plugins.html.
> >>
> >> BTW, sorry for send duplicate e-mail last night, due to my network
> failure
> >>
> >> Best wish.
> >> - jiajie
> >>
> >> ________________________________
> >>
> >> Hi,
> >>
> >> Thank you for this explanation. If I summarize, I'll have to write a
> >> file_to_postgres Operator, with pg_preoperator and pg_postoperator
> >> parameters.
> >>
> >> Just a simple question: Where should I add and store this Operator in
> the
> >> airflow ecosystem ?
> >>
> >> Regards,
> >> Flo
> >>
> >> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <
> zhongjiajie955@hotmail.com>
> >> wrote:
> >>
> >> > Hi, Flo. I am not good at PG, but I find code in out master branch
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> >> > <
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >> > >
> >> > I think maybe this is what you looking for.
> >> >
> >> > And, we recommend use Operator to do something instead of Hook. But in
> >> we
> >> > have no "local-file-pg-operator". maybe you should and this function
> by
> >> > youself.
> >> >
> >> > BWT, I think
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> > what you said is a transaction, and so do in a single operator. you
> >> could
> >> > write code just like
> >> >
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> >
> >> > apache/airflow<
> >> >
> >>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >> > >
> >> > Apache Airflow. Contribute to apache/airflow development by creating
> an
> >> > account on GitHub.
> >> > github.com
> >> >
> >> > that have "pg_preoperator" and "pg_postoperator" parameter, but
> extract
> >> > data from local file instand of hive.
> >> >
> >> > ________________________________
> >> > From: Flo Rance <tr...@gmail.com>
> >> > Sent: Wednesday, March 20, 2019 23:30
> >> > To: dev@airflow.apache.org
> >> > Subject: PostgreSQL hook
> >> >
> >> > Hi,
> >> >
> >> > I don't know if it's the correct place to ask for that.
> >> >
> >> > I'm trying to implement one of my cronjob using airflow. One of the
> >> tasks
> >> > is to load files in a temporary table and then update another table
> in a
> >> > postgres db.
> >> > For that, I was previously using a sql script like that:
> >> >
> >> > BEGIN;
> >> >
> >> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> >> WITH
> >> > NO DATA;
> >> >
> >> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER
> ';'
> >> CSV
> >> > ENCODING 'LATIN1' NULL '';
> >> >
> >> > DELETE FROM catalog_tmp WHERE code IS NULL;
> >> > ...
> >> > COMMIT;
> >> >
> >> > I would like to replace \copy with the copy_expert from postgresql
> >> hook. Is
> >> > that realistic ?
> >> > If yes, how can I combine a sql script and that hook in one task ?
> >> >
> >> > Regards,
> >> > Flo
> >> >
> >>
> >
>
Re: PostgreSQL hook
Posted by Jiajie Zhong <zh...@hotmail.com>.
I wrote a demo code here. maybe it not work but I think the idea is right.
# create file name file_to_Pg.py
from airflow.models import BaseOperator
from airflow.hooks.postgres_hook import PostgresHook
class FileToPgTransfer(BaseOperator):
def __init__(self, postgres_conn_id, pg_preoperator, sql, pg_postoperator, ... , *args, **kwargs):
# init here esle
self.postgres_conn_id = postgres_conn_id
self.pg_preoperator = pg_preoperator
self.sql = sql
self.pg_postoperator = pg_postoperator
# init here esle
def execute(self, context):
pg = PostgresHook(postgres_conn_id=self.postgres_conn_id)
if self.pg_preoperator:
self.log.info("Running Postgres preoperator")
pg.run(self.pg_preoperator)
self.log.info("loading file to pg table.")
pg.run(sql=self.sql)
if self.pg_postoperator:
self.log.info("Running Postgres postoperator")
pg.run(self.pg_postoperator)
# you could use using below code in DAG file
task = FileToPgTransfer(
postgres_conn_id='postgres_conn_id',
pg_preoperator='CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH NO DATA',
sql="\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV ENCODING 'LATIN1' NULL ''",
pg_postoperator='DELETE FROM catalog_tmp WHERE code IS NULL',
)
Best wish.
-- jiajie
________________________________
From: Flo Rance <tr...@gmail.com>
Sent: Thursday, March 21, 2019 22:31
To: dev@airflow.apache.org
Subject: Re: PostgreSQL hook
I've found some more information that seems to confirm my suspicion.
The connection is not persistent between the pg_preoperator step and the
copy_expert one.
There's a suggestion to make persistence a property of the connection:
https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection
I would be very grateful if someone could help me implement that in my
operator that uses PostgreSQL Hook.
Regards,
Flo
On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <tr...@gmail.com> wrote:
> No problem.
>
> Thanks for the link, I was able to create a plugin and an operator that do
> almost what I want.
>
> My only issue is regarding the temp table, because it's not available when
> I call copy_expert. So it seems to me that's not the same session as the
> one that created the temp table previously, because if I use a standard
> table I don't have this issue.
>
> Does anyone have an idea how to fix this?
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zh...@hotmail.com>
> wrote:
>
>> Using Airflow plugins, maybe you should take a look at
>> https://airflow.apache.org/plugins.html.
>>
>> BTW, sorry for send duplicate e-mail last night, due to my network failure
>>
>> Best wish.
>> - jiajie
>>
>> ________________________________
>>
>> Hi,
>>
>> Thank you for this explanation. If I summarize, I'll have to write a
>> file_to_postgres Operator, with pg_preoperator and pg_postoperator
>> parameters.
>>
>> Just a simple question: Where should I add and store this Operator in the
>> airflow ecosystem ?
>>
>> Regards,
>> Flo
>>
>> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
>> wrote:
>>
>> > Hi, Flo. I am not good at PG, but I find code in out master branch
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
>> > <
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
>> > >
>> > I think maybe this is what you looking for.
>> >
>> > And, we recommend use Operator to do something instead of Hook. But in
>> we
>> > have no "local-file-pg-operator". maybe you should and this function by
>> > youself.
>> >
>> > BWT, I think
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> > what you said is a transaction, and so do in a single operator. you
>> could
>> > write code just like
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> >
>> > apache/airflow<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> > Apache Airflow. Contribute to apache/airflow development by creating an
>> > account on GitHub.
>> > github.com
>> >
>> > that have "pg_preoperator" and "pg_postoperator" parameter, but extract
>> > data from local file instand of hive.
>> >
>> > ________________________________
>> > From: Flo Rance <tr...@gmail.com>
>> > Sent: Wednesday, March 20, 2019 23:30
>> > To: dev@airflow.apache.org
>> > Subject: PostgreSQL hook
>> >
>> > Hi,
>> >
>> > I don't know if it's the correct place to ask for that.
>> >
>> > I'm trying to implement one of my cronjob using airflow. One of the
>> tasks
>> > is to load files in a temporary table and then update another table in a
>> > postgres db.
>> > For that, I was previously using a sql script like that:
>> >
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> >
>> > I would like to replace \copy with the copy_expert from postgresql
>> hook. Is
>> > that realistic ?
>> > If yes, how can I combine a sql script and that hook in one task ?
>> >
>> > Regards,
>> > Flo
>> >
>>
>
Re: PostgreSQL hook
Posted by Flo Rance <tr...@gmail.com>.
I've found some more information that seems to confirm my suspicion.
The connection is not persistent between the pg_preoperator step and the
copy_expert one.
There's a suggestion to make persistence a property of the connection:
https://stackoverflow.com/questions/50858770/airflow-retain-the-same-database-connection
I would be very grateful if someone could help me implement that in my
operator that uses PostgreSQL Hook.
Regards,
Flo
On Thu, Mar 21, 2019 at 2:38 PM Flo Rance <tr...@gmail.com> wrote:
> No problem.
>
> Thanks for the link, I was able to create a plugin and an operator that do
> almost what I want.
>
> My only issue is regarding the temp table, because it's not available when
> I call copy_expert. So it seems to me that's not the same session as the
> one that created the temp table previously, because if I use a standard
> table I don't have this issue.
>
> Does anyone have an idea how to fix this?
>
> Regards,
> Flo
>
> On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zh...@hotmail.com>
> wrote:
>
>> Using Airflow plugins, maybe you should take a look at
>> https://airflow.apache.org/plugins.html.
>>
>> BTW, sorry for send duplicate e-mail last night, due to my network failure
>>
>> Best wish.
>> - jiajie
>>
>> ________________________________
>>
>> Hi,
>>
>> Thank you for this explanation. If I summarize, I'll have to write a
>> file_to_postgres Operator, with pg_preoperator and pg_postoperator
>> parameters.
>>
>> Just a simple question: Where should I add and store this Operator in the
>> airflow ecosystem ?
>>
>> Regards,
>> Flo
>>
>> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
>> wrote:
>>
>> > Hi, Flo. I am not good at PG, but I find code in out master branch
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
>> > <
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
>> > >
>> > I think maybe this is what you looking for.
>> >
>> > And, we recommend use Operator to do something instead of Hook. But in
>> we
>> > have no "local-file-pg-operator". maybe you should and this function by
>> > youself.
>> >
>> > BWT, I think
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> > what you said is a transaction, and so do in a single operator. you
>> could
>> > write code just like
>> >
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> >
>> > apache/airflow<
>> >
>> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
>> > >
>> > Apache Airflow. Contribute to apache/airflow development by creating an
>> > account on GitHub.
>> > github.com
>> >
>> > that have "pg_preoperator" and "pg_postoperator" parameter, but extract
>> > data from local file instand of hive.
>> >
>> > ________________________________
>> > From: Flo Rance <tr...@gmail.com>
>> > Sent: Wednesday, March 20, 2019 23:30
>> > To: dev@airflow.apache.org
>> > Subject: PostgreSQL hook
>> >
>> > Hi,
>> >
>> > I don't know if it's the correct place to ask for that.
>> >
>> > I'm trying to implement one of my cronjob using airflow. One of the
>> tasks
>> > is to load files in a temporary table and then update another table in a
>> > postgres db.
>> > For that, I was previously using a sql script like that:
>> >
>> > BEGIN;
>> >
>> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
>> WITH
>> > NO DATA;
>> >
>> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
>> CSV
>> > ENCODING 'LATIN1' NULL '';
>> >
>> > DELETE FROM catalog_tmp WHERE code IS NULL;
>> > ...
>> > COMMIT;
>> >
>> > I would like to replace \copy with the copy_expert from postgresql
>> hook. Is
>> > that realistic ?
>> > If yes, how can I combine a sql script and that hook in one task ?
>> >
>> > Regards,
>> > Flo
>> >
>>
>
Re: PostgreSQL hook
Posted by Flo Rance <tr...@gmail.com>.
No problem.
Thanks for the link, I was able to create a plugin and an operator that do
almost what I want.
My only issue is regarding the temp table, because it's not available when
I call copy_expert. So it seems to me that's not the same session as the
one that created the temp table previously, because if I use a standard
table I don't have this issue.
Does anyone have an idea how to fix this?
Regards,
Flo
On Thu, Mar 21, 2019 at 9:36 AM jiajie zhong <zh...@hotmail.com>
wrote:
> Using Airflow plugins, maybe you should take a look at
> https://airflow.apache.org/plugins.html.
>
> BTW, sorry for send duplicate e-mail last night, due to my network failure
>
> Best wish.
> - jiajie
>
> ________________________________
>
> Hi,
>
> Thank you for this explanation. If I summarize, I'll have to write a
> file_to_postgres Operator, with pg_preoperator and pg_postoperator
> parameters.
>
> Just a simple question: Where should I add and store this Operator in the
> airflow ecosystem ?
>
> Regards,
> Flo
>
> On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
> wrote:
>
> > Hi, Flo. I am not good at PG, but I find code in out master branch
> >
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> > <
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> > >
> > I think maybe this is what you looking for.
> >
> > And, we recommend use Operator to do something instead of Hook. But in we
> > have no "local-file-pg-operator". maybe you should and this function by
> > youself.
> >
> > BWT, I think
> > BEGIN;
> >
> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> WITH
> > NO DATA;
> >
> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
> CSV
> > ENCODING 'LATIN1' NULL '';
> >
> > DELETE FROM catalog_tmp WHERE code IS NULL;
> > ...
> > COMMIT;
> > what you said is a transaction, and so do in a single operator. you could
> > write code just like
> >
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> > [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> > >
> >
> > apache/airflow<
> >
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> > >
> > Apache Airflow. Contribute to apache/airflow development by creating an
> > account on GitHub.
> > github.com
> >
> > that have "pg_preoperator" and "pg_postoperator" parameter, but extract
> > data from local file instand of hive.
> >
> > ________________________________
> > From: Flo Rance <tr...@gmail.com>
> > Sent: Wednesday, March 20, 2019 23:30
> > To: dev@airflow.apache.org
> > Subject: PostgreSQL hook
> >
> > Hi,
> >
> > I don't know if it's the correct place to ask for that.
> >
> > I'm trying to implement one of my cronjob using airflow. One of the tasks
> > is to load files in a temporary table and then update another table in a
> > postgres db.
> > For that, I was previously using a sql script like that:
> >
> > BEGIN;
> >
> > CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog
> WITH
> > NO DATA;
> >
> > \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';'
> CSV
> > ENCODING 'LATIN1' NULL '';
> >
> > DELETE FROM catalog_tmp WHERE code IS NULL;
> > ...
> > COMMIT;
> >
> > I would like to replace \copy with the copy_expert from postgresql hook.
> Is
> > that realistic ?
> > If yes, how can I combine a sql script and that hook in one task ?
> >
> > Regards,
> > Flo
> >
>
Re: PostgreSQL hook
Posted by jiajie zhong <zh...@hotmail.com>.
Using Airflow plugins, maybe you should take a look at https://airflow.apache.org/plugins.html.
BTW, sorry for send duplicate e-mail last night, due to my network failure
Best wish.
- jiajie
________________________________
Hi,
Thank you for this explanation. If I summarize, I'll have to write a
file_to_postgres Operator, with pg_preoperator and pg_postoperator
parameters.
Just a simple question: Where should I add and store this Operator in the
airflow ecosystem ?
Regards,
Flo
On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
wrote:
> Hi, Flo. I am not good at PG, but I find code in out master branch
>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> <
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >
> I think maybe this is what you looking for.
>
> And, we recommend use Operator to do something instead of Hook. But in we
> have no "local-file-pg-operator". maybe you should and this function by
> youself.
>
> BWT, I think
> BEGIN;
>
> CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
> NO DATA;
>
> \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
> ENCODING 'LATIN1' NULL '';
>
> DELETE FROM catalog_tmp WHERE code IS NULL;
> ...
> COMMIT;
> what you said is a transaction, and so do in a single operator. you could
> write code just like
>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >
>
> apache/airflow<
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >
> Apache Airflow. Contribute to apache/airflow development by creating an
> account on GitHub.
> github.com
>
> that have "pg_preoperator" and "pg_postoperator" parameter, but extract
> data from local file instand of hive.
>
> ________________________________
> From: Flo Rance <tr...@gmail.com>
> Sent: Wednesday, March 20, 2019 23:30
> To: dev@airflow.apache.org
> Subject: PostgreSQL hook
>
> Hi,
>
> I don't know if it's the correct place to ask for that.
>
> I'm trying to implement one of my cronjob using airflow. One of the tasks
> is to load files in a temporary table and then update another table in a
> postgres db.
> For that, I was previously using a sql script like that:
>
> BEGIN;
>
> CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
> NO DATA;
>
> \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
> ENCODING 'LATIN1' NULL '';
>
> DELETE FROM catalog_tmp WHERE code IS NULL;
> ...
> COMMIT;
>
> I would like to replace \copy with the copy_expert from postgresql hook. Is
> that realistic ?
> If yes, how can I combine a sql script and that hook in one task ?
>
> Regards,
> Flo
>
Re: PostgreSQL hook
Posted by Flo Rance <tr...@gmail.com>.
Hi,
Thank you for this explanation. If I summarize, I'll have to write a
file_to_postgres Operator, with pg_preoperator and pg_postoperator
parameters.
Just a simple question: Where should I add and store this Operator in the
airflow ecosystem ?
Regards,
Flo
On Wed, Mar 20, 2019 at 5:05 PM jiajie zhong <zh...@hotmail.com>
wrote:
> Hi, Flo. I am not good at PG, but I find code in out master branch
>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89
> <
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83
> >
> I think maybe this is what you looking for.
>
> And, we recommend use Operator to do something instead of Hook. But in we
> have no "local-file-pg-operator". maybe you should and this function by
> youself.
>
> BWT, I think
> BEGIN;
>
> CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
> NO DATA;
>
> \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
> ENCODING 'LATIN1' NULL '';
>
> DELETE FROM catalog_tmp WHERE code IS NULL;
> ...
> COMMIT;
> what you said is a transaction, and so do in a single operator. you could
> write code just like
>
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> [https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >
>
> apache/airflow<
> https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
> >
> Apache Airflow. Contribute to apache/airflow development by creating an
> account on GitHub.
> github.com
>
> that have "pg_preoperator" and "pg_postoperator" parameter, but extract
> data from local file instand of hive.
>
> ________________________________
> From: Flo Rance <tr...@gmail.com>
> Sent: Wednesday, March 20, 2019 23:30
> To: dev@airflow.apache.org
> Subject: PostgreSQL hook
>
> Hi,
>
> I don't know if it's the correct place to ask for that.
>
> I'm trying to implement one of my cronjob using airflow. One of the tasks
> is to load files in a temporary table and then update another table in a
> postgres db.
> For that, I was previously using a sql script like that:
>
> BEGIN;
>
> CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
> NO DATA;
>
> \COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
> ENCODING 'LATIN1' NULL '';
>
> DELETE FROM catalog_tmp WHERE code IS NULL;
> ...
> COMMIT;
>
> I would like to replace \copy with the copy_expert from postgresql hook. Is
> that realistic ?
> If yes, how can I combine a sql script and that hook in one task ?
>
> Regards,
> Flo
>
Re: PostgreSQL hook
Posted by jiajie zhong <zh...@hotmail.com>.
Hi, Flo. I am not good at PG, but I find code in out master branch
https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L89<https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/hooks/postgres_hook.py#L63-L83>
I think maybe this is what you looking for.
And, we recommend use Operator to do something instead of Hook. But in we have no "local-file-pg-operator". maybe you should and this function by youself.
BWT, I think
BEGIN;
CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
NO DATA;
\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
ENCODING 'LATIN1' NULL '';
DELETE FROM catalog_tmp WHERE code IS NULL;
...
COMMIT;
what you said is a transaction, and so do in a single operator. you could write code just like
https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]<https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71>
apache/airflow<https://github.com/apache/airflow/blob/dd8ce6a7123170ef4b0f719fb773527b17d9348c/airflow/operators/hive_to_mysql.py#L70-L71>
Apache Airflow. Contribute to apache/airflow development by creating an account on GitHub.
github.com
that have "pg_preoperator" and "pg_postoperator" parameter, but extract data from local file instand of hive.
________________________________
From: Flo Rance <tr...@gmail.com>
Sent: Wednesday, March 20, 2019 23:30
To: dev@airflow.apache.org
Subject: PostgreSQL hook
Hi,
I don't know if it's the correct place to ask for that.
I'm trying to implement one of my cronjob using airflow. One of the tasks
is to load files in a temporary table and then update another table in a
postgres db.
For that, I was previously using a sql script like that:
BEGIN;
CREATE TEMP TABLE catalog_tmp ON COMMIT DROP AS SELECT * FROM catalog WITH
NO DATA;
\COPY catalog_tmp (...) FROM '/home/cat/catalog.csv' WITH DELIMITER ';' CSV
ENCODING 'LATIN1' NULL '';
DELETE FROM catalog_tmp WHERE code IS NULL;
...
COMMIT;
I would like to replace \copy with the copy_expert from postgresql hook. Is
that realistic ?
If yes, how can I combine a sql script and that hook in one task ?
Regards,
Flo