You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jonathan Perron <jo...@lumapps.com> on 2018/10/02 08:40:46 UTC
Source or ParDo for reading data in PostgreSQL in Python
Hello,
I am looking for some way to access data stored in PostgreSQL and don't
know if I should go for a Sink or ParDo operations. It is stated that
ParDo could be used but I'm not sure this is what will solve my problem,
so here I am !I managed to write in the database with only ParDo
operations, so I guess it is also possible here.
Some details about my use case:
* The Python SDK is used;
* Reading in the database is the first operation of the pipeline before
making some calculation;
* It is performed with SQLAlchemy, but could also be done with psycopg2;
* I don't think parallelizing this operation is necessary as the query
are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).
Here are my DoFn classes:
/class ExtractFromPostgreSQLFn(beam.DoFn):
"""
Extract PCollection from PostgreSQL
"""
def start_bundle(self):
self._session = Session()
def process(self, element):
raise NotImplementedError
def finish_bundle(self):
self._session.close()
class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
def start_bundle(self, arg1, arg2):
super(ReadFromPostgresqlFn, self).start_bundle()
self._arg1 = arg1
self._arg2 = arg2
def process(self, element):
entities = (
self._session.query(Entity)
.filter(Entity.arg1 == self._arg1)
.filter(Entity.arg2 == self._arg2)
.all()
)
yield (self._arg1, arg2)/
As I said, I used it just after the initialization of the pipeline:
/p = beam.Pipeline(options=pipeline_options)
psql_entities = p | "Extract Entities from PSQL backup" >>
beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/
Unfortunately, I end up with an /AttributeError: 'PBegin' object has no
attribute 'windowing'/ error.
Where did I make a mistake ? I take every input you could provide me on
this topic.
Thanks for your time,
Jonathan
Re: Source or ParDo for reading data in PostgreSQL in Python
Posted by Jonathan Perron <jo...@inuse.eu>.
I found that I forgot to perform a beam.Create() first... So no need to
answer me, this solves my problem.
On 2018/10/02 08:40:46, Jonathan Perron <j....@lumapps.com> wrote:
> Hello,>
>
> I am looking for some way to access data stored in PostgreSQL and
don't >
> know if I should go for a Sink or ParDo operations. It is stated that >
> ParDo could be used but I'm not sure this is what will solve my
problem, >
> so here I am !I managed to write in the database with only ParDo >
> operations, so I guess it is also possible here.>
>
> Some details about my use case:>
>
> * The Python SDK is used;>
>
> * Reading in the database is the first operation of the pipeline
before >
> making some calculation;>
>
> * It is performed with SQLAlchemy, but could also be done with
psycopg2;>
>
> * I don't think parallelizing this operation is necessary as the query >
> are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).>
>
> Here are my DoFn classes:>
>
> /class ExtractFromPostgreSQLFn(beam.DoFn):>
> """>
> Extract PCollection from PostgreSQL>
> """>
>
> def start_bundle(self):>
> self._session = Session()>
>
> def process(self, element):>
> raise NotImplementedError>
>
> def finish_bundle(self):>
> self._session.close()>
>
>
> class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):>
> def start_bundle(self, arg1, arg2):>
> super(ReadFromPostgresqlFn, self).start_bundle()>
> self._arg1 = arg1>
> self._arg2 = arg2>
>
> def process(self, element):>
> entities = (>
> self._session.query(Entity)>
> .filter(Entity.arg1 == self._arg1)>
> .filter(Entity.arg2 == self._arg2)>
> .all()>
> )>
> yield (self._arg1, arg2)/>
>
> As I said, I used it just after the initialization of the pipeline:>
>
> /p = beam.Pipeline(options=pipeline_options)>
> psql_entities = p | "Extract Entities from PSQL backup" >> >
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/>
>
> Unfortunately, I end up with an /AttributeError: 'PBegin' object has
no >
> attribute 'windowing'/ error.>
>
> Where did I make a mistake ? I take every input you could provide me
on >
> this topic.>
>
> Thanks for your time,>
>
> Jonathan>
>
>
>
--
<http://inuse.eu>
OptimData
71 rue Desnouettes, 75015 Paris, France
http://inuse.eu <http://inuse.eu>
Re: Source or ParDo for reading data in PostgreSQL in Python
Posted by Jonathan Perron <jo...@lumapps.com>.
Hello again,
Thanks to
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/estimate_pi.py,
I saw that my first step must be a beam.Create() with an iterable. Doing
so solved my problem.
Sorry for my mistake.
On 2018/10/02 08:40:46, Jonathan Perron <j....@lumapps.com> wrote:
> Hello,>
>
> I am looking for some way to access data stored in PostgreSQL and
don't >
> know if I should go for a Sink or ParDo operations. It is stated that >
> ParDo could be used but I'm not sure this is what will solve my
problem, >
> so here I am !I managed to write in the database with only ParDo >
> operations, so I guess it is also possible here.>
>
> Some details about my use case:>
>
> * The Python SDK is used;>
>
> * Reading in the database is the first operation of the pipeline
before >
> making some calculation;>
>
> * It is performed with SQLAlchemy, but could also be done with
psycopg2;>
>
> * I don't think parallelizing this operation is necessary as the query >
> are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).>
>
> Here are my DoFn classes:>
>
> /class ExtractFromPostgreSQLFn(beam.DoFn):>
> """>
> Extract PCollection from PostgreSQL>
> """>
>
> def start_bundle(self):>
> self._session = Session()>
>
> def process(self, element):>
> raise NotImplementedError>
>
> def finish_bundle(self):>
> self._session.close()>
>
>
> class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):>
> def start_bundle(self, arg1, arg2):>
> super(ReadFromPostgresqlFn, self).start_bundle()>
> self._arg1 = arg1>
> self._arg2 = arg2>
>
> def process(self, element):>
> entities = (>
> self._session.query(Entity)>
> .filter(Entity.arg1 == self._arg1)>
> .filter(Entity.arg2 == self._arg2)>
> .all()>
> )>
> yield (self._arg1, arg2)/>
>
> As I said, I used it just after the initialization of the pipeline:>
>
> /p = beam.Pipeline(options=pipeline_options)>
> psql_entities = p | "Extract Entities from PSQL backup" >> >
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/>
>
> Unfortunately, I end up with an /AttributeError: 'PBegin' object has
no >
> attribute 'windowing'/ error.>
>
> Where did I make a mistake ? I take every input you could provide me
on >
> this topic.>
>
> Thanks for your time,>
>
> Jonathan>
>
>
>
Re: Source or ParDo for reading data in PostgreSQL in Python
Posted by Pascal Gula <pa...@plantix.net>.
Hi Jonathan,
I had a similar requirement as yours, but for mongoDB and I tentatively
wrote an IO Connector for it that you can find here:
https://github.com/PEAT-AI/beam-extended
It is working with the DirectRunner in Read mode (I need to do some test
then on DataFlow).
But I faced some issue with the connector serializability for Write mode.
Long story short, pymongo is not serializable, and this is something you
have to validate for sqlalchemy.
Generally speaking, take a look at this part of the doc if you want to know
more about IO connector development:
https://beam.apache.org/documentation/sdks/python-custom-io/
I am not an expert but I'll be glad to help you since I would also need to
support PostgreSQL in the near future!
Cheers,
Pascal
[image: --]
Pascal Gula
[image: https://]about.me/metanov
<https://about.me/metanov?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=chrome_ext>
On Tue, 2 Oct 2018 at 10:41, Jonathan Perron <jo...@lumapps.com>
wrote:
> Hello,
>
> I am looking for some way to access data stored in PostgreSQL and don't
> know if I should go for a Sink or ParDo operations. It is stated that ParDo
> could be used but I'm not sure this is what will solve my problem, so here
> I am !I managed to write in the database with only ParDo operations, so I
> guess it is also possible here.
>
> Some details about my use case:
>
> * The Python SDK is used;
>
> * Reading in the database is the first operation of the pipeline before
> making some calculation;
>
> * It is performed with SQLAlchemy, but could also be done with psycopg2;
>
> * I don't think parallelizing this operation is necessary as the query are
> and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).
>
> Here are my DoFn classes:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *class ExtractFromPostgreSQLFn(beam.DoFn): """ Extract PCollection
> from PostgreSQL """ def start_bundle(self): self._session =
> Session() def process(self, element): raise NotImplementedError
> def finish_bundle(self): self._session.close() class
> ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn): def
> start_bundle(self, arg1, arg2): super(ReadFromPostgresqlFn,
> self).start_bundle() self._arg1 = arg1 self._arg2 = arg2
> def process(self, element): entities = (
> self._session.query(Entity) .filter(Entity.arg1 == self._arg1)
> .filter(Entity.arg2 == self._arg2) .all() )
> yield (self._arg1, arg2)*
>
> As I said, I used it just after the initialization of the pipeline:
>
>
> *p = beam.Pipeline(options=pipeline_options) psql_entities = p | "Extract
> Entities from PSQL backup" >>
> beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())*
>
> Unfortunately, I end up with an *AttributeError: 'PBegin' object has no
> attribute 'windowing'* error.
>
> Where did I make a mistake ? I take every input you could provide me on
> this topic.
>
> Thanks for your time,
>
> Jonathan
>
>
>
--
Pascal Gula
Senior Data Engineer / Scientist
+49 (0)176 34232684www.plantix.net <http://plantix.net/>
PEAT GmbH
Kastanienallee 4
10435 Berlin // Germany
<https://play.google.com/store/apps/details?id=com.peat.GartenBank>Download
the App! <https://play.google.com/store/apps/details?id=com.peat.GartenBank>