You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jonathan Perron <jo...@lumapps.com> on 2018/10/02 08:40:46 UTC

Source or ParDo for reading data in PostgreSQL in Python

Hello,

I am looking for some way to access data stored in PostgreSQL and don't 
know if I should go for a Sink or ParDo operations. It is stated that 
ParDo could be used but I'm not sure this is what will solve my problem, 
so here I am !I managed to write in the database with only ParDo 
operations, so I guess it is also possible here.

Some details about my use case:

* The Python SDK is used;

* Reading in the database is the first operation of the pipeline before 
making some calculation;

* It is performed with SQLAlchemy, but could also be done with psycopg2;

* I don't think parallelizing this operation is necessary as the query 
are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).

Here are my DoFn classes:

/class ExtractFromPostgreSQLFn(beam.DoFn):
     """
     Extract PCollection from PostgreSQL
     """

     def start_bundle(self):
         self._session = Session()

     def process(self, element):
         raise NotImplementedError

     def finish_bundle(self):
         self._session.close()


class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
     def start_bundle(self, arg1, arg2):
         super(ReadFromPostgresqlFn, self).start_bundle()
         self._arg1 = arg1
         self._arg2 = arg2

     def process(self, element):
         entities = (
             self._session.query(Entity)
             .filter(Entity.arg1 == self._arg1)
             .filter(Entity.arg2 == self._arg2)
             .all()
         )
         yield (self._arg1, arg2)/

As I said, I used it just after the initialization of the pipeline:

/p = beam.Pipeline(options=pipeline_options)
psql_entities = p | "Extract Entities from PSQL backup" >> 
beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/

Unfortunately, I end up with an /AttributeError: 'PBegin' object has no 
attribute 'windowing'/ error.

Where did I make a mistake ? I take every input you could provide me on 
this topic.

Thanks for your time,

Jonathan



Re: Source or ParDo for reading data in PostgreSQL in Python

Posted by Jonathan Perron <jo...@inuse.eu>.
I found that I forgot to perform a beam.Create() first... So no need to 
answer me, this solves my problem.

On 2018/10/02 08:40:46, Jonathan Perron <j....@lumapps.com> wrote:
 > Hello,>
 >
 > I am looking for some way to access data stored in PostgreSQL and 
don't >
 > know if I should go for a Sink or ParDo operations. It is stated that >
 > ParDo could be used but I'm not sure this is what will solve my 
problem, >
 > so here I am !I managed to write in the database with only ParDo >
 > operations, so I guess it is also possible here.>
 >
 > Some details about my use case:>
 >
 > * The Python SDK is used;>
 >
 > * Reading in the database is the first operation of the pipeline 
before >
 > making some calculation;>
 >
 > * It is performed with SQLAlchemy, but could also be done with 
psycopg2;>
 >
 > * I don't think parallelizing this operation is necessary as the query >
 > are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).>
 >
 > Here are my DoFn classes:>
 >
 > /class ExtractFromPostgreSQLFn(beam.DoFn):>
 >     """>
 >     Extract PCollection from PostgreSQL>
 >     """>
 >
 >     def start_bundle(self):>
 >         self._session = Session()>
 >
 >     def process(self, element):>
 >         raise NotImplementedError>
 >
 >     def finish_bundle(self):>
 >         self._session.close()>
 >
 >
 > class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):>
 >     def start_bundle(self, arg1, arg2):>
 >         super(ReadFromPostgresqlFn, self).start_bundle()>
 >         self._arg1 = arg1>
 >         self._arg2 = arg2>
 >
 >     def process(self, element):>
 >         entities = (>
 >             self._session.query(Entity)>
 >             .filter(Entity.arg1 == self._arg1)>
 >             .filter(Entity.arg2 == self._arg2)>
 >             .all()>
 >         )>
 >         yield (self._arg1, arg2)/>
 >
 > As I said, I used it just after the initialization of the pipeline:>
 >
 > /p = beam.Pipeline(options=pipeline_options)>
 > psql_entities = p | "Extract Entities from PSQL backup" >> >
 > beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/>
 >
 > Unfortunately, I end up with an /AttributeError: 'PBegin' object has 
no >
 > attribute 'windowing'/ error.>
 >
 > Where did I make a mistake ? I take every input you could provide me 
on >
 > this topic.>
 >
 > Thanks for your time,>
 >
 > Jonathan>
 >
 >
 >

-- 
 <http://inuse.eu>


OptimData
71 rue Desnouettes, 75015 Paris, France

http://inuse.eu <http://inuse.eu>





Re: Source or ParDo for reading data in PostgreSQL in Python

Posted by Jonathan Perron <jo...@lumapps.com>.
Hello again,

Thanks to 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/estimate_pi.py, 
I saw that my first step must be a beam.Create() with an iterable. Doing 
so solved my problem.

Sorry for my mistake.

On 2018/10/02 08:40:46, Jonathan Perron <j....@lumapps.com> wrote:
 > Hello,>
 >
 > I am looking for some way to access data stored in PostgreSQL and 
don't >
 > know if I should go for a Sink or ParDo operations. It is stated that >
 > ParDo could be used but I'm not sure this is what will solve my 
problem, >
 > so here I am !I managed to write in the database with only ParDo >
 > operations, so I guess it is also possible here.>
 >
 > Some details about my use case:>
 >
 > * The Python SDK is used;>
 >
 > * Reading in the database is the first operation of the pipeline 
before >
 > making some calculation;>
 >
 > * It is performed with SQLAlchemy, but could also be done with 
psycopg2;>
 >
 > * I don't think parallelizing this operation is necessary as the query >
 > are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).>
 >
 > Here are my DoFn classes:>
 >
 > /class ExtractFromPostgreSQLFn(beam.DoFn):>
 >     """>
 >     Extract PCollection from PostgreSQL>
 >     """>
 >
 >     def start_bundle(self):>
 >         self._session = Session()>
 >
 >     def process(self, element):>
 >         raise NotImplementedError>
 >
 >     def finish_bundle(self):>
 >         self._session.close()>
 >
 >
 > class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):>
 >     def start_bundle(self, arg1, arg2):>
 >         super(ReadFromPostgresqlFn, self).start_bundle()>
 >         self._arg1 = arg1>
 >         self._arg2 = arg2>
 >
 >     def process(self, element):>
 >         entities = (>
 >             self._session.query(Entity)>
 >             .filter(Entity.arg1 == self._arg1)>
 >             .filter(Entity.arg2 == self._arg2)>
 >             .all()>
 >         )>
 >         yield (self._arg1, arg2)/>
 >
 > As I said, I used it just after the initialization of the pipeline:>
 >
 > /p = beam.Pipeline(options=pipeline_options)>
 > psql_entities = p | "Extract Entities from PSQL backup" >> >
 > beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/>
 >
 > Unfortunately, I end up with an /AttributeError: 'PBegin' object has 
no >
 > attribute 'windowing'/ error.>
 >
 > Where did I make a mistake ? I take every input you could provide me 
on >
 > this topic.>
 >
 > Thanks for your time,>
 >
 > Jonathan>
 >
 >
 >

Re: Source or ParDo for reading data in PostgreSQL in Python

Posted by Pascal Gula <pa...@plantix.net>.
Hi Jonathan,
I had a similar requirement as yours, but for mongoDB and I tentatively
wrote an IO Connector for it that you can find here:
https://github.com/PEAT-AI/beam-extended
It is working with the DirectRunner in Read mode (I need to do some test
then on DataFlow).
But I faced some issue with the connector serializability for Write mode.
Long story short, pymongo is not serializable, and this is something you
have to validate for sqlalchemy.
Generally speaking, take a look at this part of the doc if you want to know
more about IO connector development:
https://beam.apache.org/documentation/sdks/python-custom-io/
I am not an expert but I'll be glad to help you since I would also need to
support PostgreSQL in the near future!
Cheers,
Pascal


[image: --]

Pascal Gula
[image: https://]about.me/metanov
<https://about.me/metanov?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=chrome_ext>


On Tue, 2 Oct 2018 at 10:41, Jonathan Perron <jo...@lumapps.com>
wrote:

> Hello,
>
> I am looking for some way to access data stored in PostgreSQL and don't
> know if I should go for a Sink or ParDo operations. It is stated that ParDo
> could be used but I'm not sure this is what will solve my problem, so here
> I am !I managed to write in the database with only ParDo operations, so I
> guess it is also possible here.
>
> Some details about my use case:
>
> * The Python SDK is used;
>
> * Reading in the database is the first operation of the pipeline before
> making some calculation;
>
> * It is performed with SQLAlchemy, but could also be done with psycopg2;
>
> * I don't think parallelizing this operation is necessary as the query are
> and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).
>
> Here are my DoFn classes:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *class ExtractFromPostgreSQLFn(beam.DoFn):     """     Extract PCollection
> from PostgreSQL     """     def start_bundle(self):         self._session =
> Session()     def process(self, element):         raise NotImplementedError
>     def finish_bundle(self):         self._session.close() class
> ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):     def
> start_bundle(self, arg1, arg2):         super(ReadFromPostgresqlFn,
> self).start_bundle()         self._arg1 = arg1         self._arg2 = arg2
>     def process(self, element):         entities = (
> self._session.query(Entity)             .filter(Entity.arg1 == self._arg1)
>             .filter(Entity.arg2 == self._arg2)             .all()         )
>         yield (self._arg1, arg2)*
>
> As I said, I used it just after the initialization of the pipeline:
>
>
> *p = beam.Pipeline(options=pipeline_options) psql_entities = p | "Extract
> Entities from PSQL backup" >>
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())*
>
> Unfortunately, I end up with an *AttributeError: 'PBegin' object has no
> attribute 'windowing'* error.
>
> Where did I make a mistake ? I take every input you could provide me on
> this topic.
>
> Thanks for your time,
>
> Jonathan
>
>
>

-- 

Pascal Gula
Senior Data Engineer / Scientist
+49 (0)176 34232684www.plantix.net <http://plantix.net/>
 PEAT GmbH
Kastanienallee 4
10435 Berlin // Germany
 <https://play.google.com/store/apps/details?id=com.peat.GartenBank>Download
the App! <https://play.google.com/store/apps/details?id=com.peat.GartenBank>