You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jonathan Perron <jo...@lumapps.com> on 2018/10/01 09:44:03 UTC
Fwd: [Need advices] Troubles to create custom PTransform in Python
Hello everybody,
I am new with Apache Beam and I need some advices on own to write
properly a custom PTransform in Python.
I try to read entities from a PostgreSQL database using SQLAlchemy. I
followed the examples in the documentation for Pub/Sub
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub)
and Datastore
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).
Here is what I achieved so far:
/class ExtractFromPostgreSQLFn(beam.DoFn)://
// """//
// Extract PCollection from PostgreSQL//
// """//
//
// def start_bundle(self)://
// self._session = Session()//
//
// def process(self, element)://
// raise NotImplementedError//
//
// def finish_bundle(self)://
// self._session.close()//
//
//
//class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn)://
// def start_bundle(self, arg1, arg2)://
// self._arg1 = arg1//
// self._arg2 = arg2//
//
// def process(self, element)://
// entities = (//
// self._session.query(Entity)//
// .filter(Entity.arg1 == self._arg1)//
// .filter(Entity.arg2 == self._arg2)//
// .all()//
// )//
// return (self._arg1, entities)/
/class ExtractEntity(PTransform)://
// """//
// A ```PTransform``` Extract of the Entity.//
// """//
//
// def __init__(self, arg1, arg2="enabled")://
// """//
// Inializes ```ExtractEntity```//
// Args://
// arg1//
// arg2//
// """//
// if not arg1://
// raise ValueError("arg1 cannot be empty")//
//
// self._arg1 = arg1//
// self._arg2 = arg2//
//
// def expand(self, pcoll)://
// """//
// This is a composite transform involves the following://
// 1. Create a query object //
// 2. Run the query against the database. //
// """//
// database_entities = pcoll.pipeline | "Extract datastore users"
>> beam.ParDo(//
////ReadEntityFromPostgresqlFn//(self._arg1, self.arg2)//
// )//
// return database_entities/
The PTransform is called at the very beginning of my pipeline:
/p = beam.Pipeline(options=pipeline_options)//
//database//= (//
// p//
// | "Extract datastore users"/
/ >> ExtractDatastoreUsers(arg1="123456")/
/)/
It keeps raising/AttributeError: 'PBegin' object has no attribute
'windowing'/.
Please note that it's only a draft (I will extract several entities from
the database, so the query will not be "hard-coded" but passed as a
parameter at some point).
I have thus several questions:
1) Does anyone know why what I am trying to achieve is not working ?
2) Is this the good way to proceed, i.e. creating a custom PTransform
which executes ParDo operations, or should I go directly with ParDo
operations ?
3) Apart from
https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations,
is there a guide on own to proper write a custom PTransform ?
4) Is it better to use the low-level psycopg2 driver here or using
SQLAlchemy is fine ?
Many thanks in advance for your time and help !
Jonathan
Re: [Need advices] Troubles to create custom PTransform in Python
Posted by Jonathan Perron <jo...@lumapps.com>.
Thank for your answer !
It does somehow go against what is here,
https://beam.apache.org/documentation/io/authoring-overview/#read-transforms,
which states that one can also use ParDo operation. So should I remove
the PTransform and read directly from the database using custom ParDo ?
Le 01/10/2018 à 15:32, Abhijit Chanda a écrit :
>
> Further refer beam documentation on design-your-pipeline
> <https://beam.apache.org/documentation/pipelines/design-your-pipeline/>
>
> *From: *Abhijit Chanda <ab...@gammanalytics.com>
> *Date: *Monday, 1 October 2018 at 6:58 PM
> *To: *Jonathan Perron <jo...@lumapps.com>,
> "user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: [Need advices] Troubles to create custom PTransform in
> Python
>
> You can’t call transform function as a very first step in any
> pipeline. First block should be always source and followed by any
> number of transforms based upon your use case.
>
> *From: *Jonathan Perron <jo...@lumapps.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, 1 October 2018 at 3:14 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Fwd: [Need advices] Troubles to create custom PTransform in
> Python
>
> Hello everybody,
>
> I am new with Apache Beam and I need some advices on own to write
> properly a custom PTransform in Python.
>
> I try to read entities from a PostgreSQL database using SQLAlchemy. I
> followed the examples in the documentation for Pub/Sub
> (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub)
> and Datastore
> (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).
>
> Here is what I achieved so far:
>
> /class ExtractFromPostgreSQLFn(beam.DoFn):
> """
> Extract PCollection from PostgreSQL
> """
>
> def start_bundle(self):
> self._session = Session()
>
> def process(self, element):
> raise NotImplementedError
>
> def finish_bundle(self):
> self._session.close()
>
>
> class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
> def start_bundle(self, arg1, arg2):
> self._arg1 = arg1
> self._arg2 = arg2
>
> def process(self, element):
> entities = (
> self._session.query(Entity)
> .filter(Entity.arg1 == self._arg1)
> .filter(Entity.arg2 == self._arg2)
> .all()
> )
> return (self._arg1, entities)/
>
>
> /class ExtractEntity(PTransform):
> """
> A ```PTransform``` Extract of the Entity.
> """
>
> def __init__(self, arg1, arg2="enabled"):
> """
> Inializes ```ExtractEntity```
> Args:
> arg1
> arg2
> """
> if not arg1:
> raise ValueError("arg1 cannot be empty")
>
> self._arg1 = arg1
> self._arg2 = arg2
>
> def expand(self, pcoll):
> """
> This is a composite transform involves the following:
> 1. Create a query object
> 2. Run the query against the database.
> """
> database_entities = pcoll.pipeline | "Extract datastore users"
> >> beam.ParDo(
> ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
> )
> return database_entities/
>
> The PTransform is called at the very beginning of my pipeline:
>
> /p = beam.Pipeline(options=pipeline_options)
> database = (
> p
> | "Extract datastore users"/
>
> / >> ExtractDatastoreUsers(arg1="123456")/
>
> /)/
>
> It keeps raising/AttributeError: 'PBegin' object has no attribute
> 'windowing'/.
>
> Please note that it's only a draft (I will extract several entities
> from the database, so the query will not be "hard-coded" but passed as
> a parameter at some point).
>
> I have thus several questions:
>
> 1) Does anyone know why what I am trying to achieve is not working ?
>
> 2) Is this the good way to proceed, i.e. creating a custom PTransform
> which executes ParDo operations, or should I go directly with ParDo
> operations ?
>
> 3) Apart from
> https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations,
> is there a guide on own to proper write a custom PTransform ?
>
> 4) Is it better to use the low-level psycopg2 driver here or using
> SQLAlchemy is fine ?
>
> Many thanks in advance for your time and help !
>
> Jonathan
>
Re: [Need advices] Troubles to create custom PTransform in Python
Posted by Abhijit Chanda <ab...@gammanalytics.com>.
Further refer beam documentation on design-your-pipeline<https://beam.apache.org/documentation/pipelines/design-your-pipeline/>
From: Abhijit Chanda <ab...@gammanalytics.com>
Date: Monday, 1 October 2018 at 6:58 PM
To: Jonathan Perron <jo...@lumapps.com>, "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: [Need advices] Troubles to create custom PTransform in Python
You can’t call transform function as a very first step in any pipeline. First block should be always source and followed by any number of transforms based upon your use case.
From: Jonathan Perron <jo...@lumapps.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, 1 October 2018 at 3:14 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Fwd: [Need advices] Troubles to create custom PTransform in Python
Hello everybody,
I am new with Apache Beam and I need some advices on own to write properly a custom PTransform in Python.
I try to read entities from a PostgreSQL database using SQLAlchemy. I followed the examples in the documentation for Pub/Sub (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) and Datastore (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).
Here is what I achieved so far:
class ExtractFromPostgreSQLFn(beam.DoFn):
"""
Extract PCollection from PostgreSQL
"""
def start_bundle(self):
self._session = Session()
def process(self, element):
raise NotImplementedError
def finish_bundle(self):
self._session.close()
class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
def start_bundle(self, arg1, arg2):
self._arg1 = arg1
self._arg2 = arg2
def process(self, element):
entities = (
self._session.query(Entity)
.filter(Entity.arg1 == self._arg1)
.filter(Entity.arg2 == self._arg2)
.all()
)
return (self._arg1, entities)
class ExtractEntity(PTransform):
"""
A ```PTransform``` Extract of the Entity.
"""
def __init__(self, arg1, arg2="enabled"):
"""
Inializes ```ExtractEntity```
Args:
arg1
arg2
"""
if not arg1:
raise ValueError("arg1 cannot be empty")
self._arg1 = arg1
self._arg2 = arg2
def expand(self, pcoll):
"""
This is a composite transform involves the following:
1. Create a query object
2. Run the query against the database.
"""
database_entities = pcoll.pipeline | "Extract datastore users" >> beam.ParDo(
ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
)
return database_entities
The PTransform is called at the very beginning of my pipeline:
p = beam.Pipeline(options=pipeline_options)
database = (
p
| "Extract datastore users"
>> ExtractDatastoreUsers(arg1="123456")
)
It keeps raising AttributeError: 'PBegin' object has no attribute 'windowing'.
Please note that it's only a draft (I will extract several entities from the database, so the query will not be "hard-coded" but passed as a parameter at some point).
I have thus several questions:
1) Does anyone know why what I am trying to achieve is not working ?
2) Is this the good way to proceed, i.e. creating a custom PTransform which executes ParDo operations, or should I go directly with ParDo operations ?
3) Apart from https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, is there a guide on own to proper write a custom PTransform ?
4) Is it better to use the low-level psycopg2 driver here or using SQLAlchemy is fine ?
Many thanks in advance for your time and help !
Jonathan
Re: [Need advices] Troubles to create custom PTransform in Python
Posted by Abhijit Chanda <ab...@gammanalytics.com>.
You can’t call transform function as a very first step in any pipeline. First block should be always source and followed by any number of transforms based upon your use case.
From: Jonathan Perron <jo...@lumapps.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, 1 October 2018 at 3:14 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Fwd: [Need advices] Troubles to create custom PTransform in Python
Hello everybody,
I am new with Apache Beam and I need some advices on own to write properly a custom PTransform in Python.
I try to read entities from a PostgreSQL database using SQLAlchemy. I followed the examples in the documentation for Pub/Sub (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) and Datastore (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).
Here is what I achieved so far:
class ExtractFromPostgreSQLFn(beam.DoFn):
"""
Extract PCollection from PostgreSQL
"""
def start_bundle(self):
self._session = Session()
def process(self, element):
raise NotImplementedError
def finish_bundle(self):
self._session.close()
class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
def start_bundle(self, arg1, arg2):
self._arg1 = arg1
self._arg2 = arg2
def process(self, element):
entities = (
self._session.query(Entity)
.filter(Entity.arg1 == self._arg1)
.filter(Entity.arg2 == self._arg2)
.all()
)
return (self._arg1, entities)
class ExtractEntity(PTransform):
"""
A ```PTransform``` Extract of the Entity.
"""
def __init__(self, arg1, arg2="enabled"):
"""
Inializes ```ExtractEntity```
Args:
arg1
arg2
"""
if not arg1:
raise ValueError("arg1 cannot be empty")
self._arg1 = arg1
self._arg2 = arg2
def expand(self, pcoll):
"""
This is a composite transform involves the following:
1. Create a query object
2. Run the query against the database.
"""
database_entities = pcoll.pipeline | "Extract datastore users" >> beam.ParDo(
ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
)
return database_entities
The PTransform is called at the very beginning of my pipeline:
p = beam.Pipeline(options=pipeline_options)
database = (
p
| "Extract datastore users"
>> ExtractDatastoreUsers(arg1="123456")
)
It keeps raising AttributeError: 'PBegin' object has no attribute 'windowing'.
Please note that it's only a draft (I will extract several entities from the database, so the query will not be "hard-coded" but passed as a parameter at some point).
I have thus several questions:
1) Does anyone know why what I am trying to achieve is not working ?
2) Is this the good way to proceed, i.e. creating a custom PTransform which executes ParDo operations, or should I go directly with ParDo operations ?
3) Apart from https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, is there a guide on own to proper write a custom PTransform ?
4) Is it better to use the low-level psycopg2 driver here or using SQLAlchemy is fine ?
Many thanks in advance for your time and help !
Jonathan