You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jonathan Perron <jo...@lumapps.com> on 2018/10/01 09:44:03 UTC

Fwd: [Need advices] Troubles to create custom PTransform in Python

Hello everybody,

I am new with Apache Beam and I need some advices on own to write 
properly a custom PTransform in Python.

I try to read entities from a PostgreSQL database using SQLAlchemy. I 
followed the examples in the documentation for Pub/Sub 
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) 
and Datastore 
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).

Here is what I achieved so far:

/class ExtractFromPostgreSQLFn(beam.DoFn)://
//    """//
//    Extract PCollection from PostgreSQL//
//    """//
//
//    def start_bundle(self)://
//        self._session = Session()//
//
//    def process(self, element)://
//        raise NotImplementedError//
//
//    def finish_bundle(self)://
//        self._session.close()//
//
//
//class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn)://
//    def start_bundle(self, arg1, arg2)://
//        self._arg1 = arg1//
//        self._arg2 = arg2//
//
//    def process(self, element)://
//        entities = (//
//            self._session.query(Entity)//
//            .filter(Entity.arg1 == self._arg1)//
//            .filter(Entity.arg2 == self._arg2)//
//            .all()//
//        )//
//        return (self._arg1, entities)/


/class ExtractEntity(PTransform)://
//    """//
//    A ```PTransform``` Extract of the Entity.//
//    """//
//
//    def __init__(self, arg1, arg2="enabled")://
//        """//
//        Inializes ```ExtractEntity```//
//        Args://
//            arg1//
//            arg2//
//        """//
//        if not arg1://
//            raise ValueError("arg1 cannot be empty")//
//
//        self._arg1 = arg1//
//        self._arg2 = arg2//
//
//    def expand(self, pcoll)://
//        """//
//        This is a composite transform involves the following://
//        1. Create a query object //
//        2. Run the query against the database. //
//        """//
//        database_entities = pcoll.pipeline | "Extract datastore users" 
 >> beam.ParDo(//
////ReadEntityFromPostgresqlFn//(self._arg1, self.arg2)//
//        )//
//        return database_entities/

The PTransform is called at the very beginning of my pipeline:

/p = beam.Pipeline(options=pipeline_options)//
//database//= (//
//    p//
//    | "Extract datastore users"/

/    >> ExtractDatastoreUsers(arg1="123456")/

/)/

It keeps raising/AttributeError: 'PBegin' object has no attribute 
'windowing'/.

Please note that it's only a draft (I will extract several entities from 
the database, so the query will not be "hard-coded" but passed as a 
parameter at some point).

I have thus several questions:

1) Does anyone know why what I am trying to achieve is not working ?

2) Is this the good way to proceed, i.e. creating a custom PTransform 
which executes ParDo operations, or should I go directly with ParDo 
operations ?

3) Apart from 
https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, 
is there a guide on own to proper write a custom PTransform ?

4) Is it better to use the low-level psycopg2 driver here or using 
SQLAlchemy is fine ?

Many thanks in advance for your time and help !

Jonathan




Re: [Need advices] Troubles to create custom PTransform in Python

Posted by Jonathan Perron <jo...@lumapps.com>.
Thank for your answer !

It does somehow go against what is here, 
https://beam.apache.org/documentation/io/authoring-overview/#read-transforms, 
which states that one can also use ParDo operation. So should I remove 
the PTransform and read directly from the database using custom ParDo ?


Le 01/10/2018 à 15:32, Abhijit Chanda a écrit :
>
> Further refer beam documentation on design-your-pipeline 
> <https://beam.apache.org/documentation/pipelines/design-your-pipeline/>
>
> *From: *Abhijit Chanda <ab...@gammanalytics.com>
> *Date: *Monday, 1 October 2018 at 6:58 PM
> *To: *Jonathan Perron <jo...@lumapps.com>, 
> "user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: [Need advices] Troubles to create custom PTransform in 
> Python
>
> You can’t call transform function as a very first step in any 
> pipeline. First block should be always source and followed by any 
> number of transforms based upon your use case.
>
> *From: *Jonathan Perron <jo...@lumapps.com>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Monday, 1 October 2018 at 3:14 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Fwd: [Need advices] Troubles to create custom PTransform in 
> Python
>
> Hello everybody,
>
> I am new with Apache Beam and I need some advices on own to write 
> properly a custom PTransform in Python.
>
> I try to read entities from a PostgreSQL database using SQLAlchemy. I 
> followed the examples in the documentation for Pub/Sub 
> (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) 
> and Datastore 
> (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).
>
> Here is what I achieved so far:
>
> /class ExtractFromPostgreSQLFn(beam.DoFn):
>     """
>     Extract PCollection from PostgreSQL
>     """
>
>     def start_bundle(self):
>         self._session = Session()
>
>     def process(self, element):
>         raise NotImplementedError
>
>     def finish_bundle(self):
>         self._session.close()
>
>
> class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
>     def start_bundle(self, arg1, arg2):
>         self._arg1 = arg1
>         self._arg2 = arg2
>
>     def process(self, element):
>         entities = (
>             self._session.query(Entity)
>             .filter(Entity.arg1 == self._arg1)
>             .filter(Entity.arg2 == self._arg2)
>             .all()
>         )
>         return (self._arg1, entities)/
>
>
> /class ExtractEntity(PTransform):
>     """
>     A ```PTransform``` Extract of the Entity.
>     """
>
>     def __init__(self, arg1, arg2="enabled"):
>         """
>         Inializes ```ExtractEntity```
>         Args:
>             arg1
>             arg2
>         """
>         if not arg1:
>             raise ValueError("arg1 cannot be empty")
>
>         self._arg1 = arg1
>         self._arg2 = arg2
>
>     def expand(self, pcoll):
>         """
>         This is a composite transform involves the following:
>         1. Create a query object
>         2. Run the query against the database.
>         """
>         database_entities = pcoll.pipeline | "Extract datastore users" 
> >> beam.ParDo(
>             ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
>         )
>         return database_entities/
>
> The PTransform is called at the very beginning of my pipeline:
>
> /p = beam.Pipeline(options=pipeline_options)
> database = (
>     p
>     | "Extract datastore users"/
>
> /    >> ExtractDatastoreUsers(arg1="123456")/
>
> /)/
>
> It keeps raising/AttributeError: 'PBegin' object has no attribute 
> 'windowing'/.
>
> Please note that it's only a draft (I will extract several entities 
> from the database, so the query will not be "hard-coded" but passed as 
> a parameter at some point).
>
> I have thus several questions:
>
> 1) Does anyone know why what I am trying to achieve is not working ?
>
> 2) Is this the good way to proceed, i.e. creating a custom PTransform 
> which executes ParDo operations, or should I go directly with ParDo 
> operations ?
>
> 3) Apart from 
> https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, 
> is there a guide on own to proper write a custom PTransform ?
>
> 4) Is it better to use the low-level psycopg2 driver here or using 
> SQLAlchemy is fine ?
>
> Many thanks in advance for your time and help !
>
> Jonathan
>


Re: [Need advices] Troubles to create custom PTransform in Python

Posted by Abhijit Chanda <ab...@gammanalytics.com>.
Further refer beam documentation on design-your-pipeline<https://beam.apache.org/documentation/pipelines/design-your-pipeline/>

From: Abhijit Chanda <ab...@gammanalytics.com>
Date: Monday, 1 October 2018 at 6:58 PM
To: Jonathan Perron <jo...@lumapps.com>, "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: [Need advices] Troubles to create custom PTransform in Python

You can’t call transform function as a very first step in any pipeline. First block should be always source and followed by any number of transforms based upon your use case.

From: Jonathan Perron <jo...@lumapps.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, 1 October 2018 at 3:14 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Fwd: [Need advices] Troubles to create custom PTransform in Python


Hello everybody,

I am new with Apache Beam and I need some advices on own to write properly a custom PTransform in Python.

I try to read entities from a PostgreSQL database using SQLAlchemy. I followed the examples in the documentation for Pub/Sub (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) and Datastore (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).

Here is what I achieved so far:

class ExtractFromPostgreSQLFn(beam.DoFn):
    """
    Extract PCollection from PostgreSQL
    """

    def start_bundle(self):
        self._session = Session()

    def process(self, element):
        raise NotImplementedError

    def finish_bundle(self):
        self._session.close()


class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
    def start_bundle(self, arg1, arg2):
        self._arg1 = arg1
        self._arg2 = arg2

    def process(self, element):
        entities = (
            self._session.query(Entity)
            .filter(Entity.arg1 == self._arg1)
            .filter(Entity.arg2 == self._arg2)
            .all()
        )
        return (self._arg1, entities)


class ExtractEntity(PTransform):
    """
    A ```PTransform``` Extract of the Entity.
    """

    def __init__(self, arg1, arg2="enabled"):
        """
        Inializes ```ExtractEntity```
        Args:
            arg1
            arg2
        """
        if not arg1:
            raise ValueError("arg1 cannot be empty")

        self._arg1 = arg1
        self._arg2 = arg2

    def expand(self, pcoll):
        """
        This is a composite transform involves the following:
        1. Create a query object
        2. Run the query against the database.
        """
        database_entities = pcoll.pipeline | "Extract datastore users" >> beam.ParDo(
            ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
        )
        return database_entities

The PTransform is called at the very beginning of my pipeline:

p = beam.Pipeline(options=pipeline_options)
database = (
    p
    | "Extract datastore users"

    >> ExtractDatastoreUsers(arg1="123456")

)

It keeps raising AttributeError: 'PBegin' object has no attribute 'windowing'.

Please note that it's only a draft (I will extract several entities from the database, so the query will not be "hard-coded" but passed as a parameter at some point).

I have thus several questions:

1) Does anyone know why what I am trying to achieve is not working ?

2) Is this the good way to proceed, i.e. creating a custom PTransform which executes ParDo operations, or should I go directly with ParDo operations ?

3) Apart from https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, is there a guide on own to proper write a custom PTransform ?

4) Is it better to use the low-level psycopg2 driver here or using SQLAlchemy is fine ?

Many thanks in advance for your time and help !

Jonathan





Re: [Need advices] Troubles to create custom PTransform in Python

Posted by Abhijit Chanda <ab...@gammanalytics.com>.
You can’t call transform function as a very first step in any pipeline. First block should be always source and followed by any number of transforms based upon your use case.

From: Jonathan Perron <jo...@lumapps.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Monday, 1 October 2018 at 3:14 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Fwd: [Need advices] Troubles to create custom PTransform in Python


Hello everybody,

I am new with Apache Beam and I need some advices on own to write properly a custom PTransform in Python.

I try to read entities from a PostgreSQL database using SQLAlchemy. I followed the examples in the documentation for Pub/Sub (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) and Datastore (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).

Here is what I achieved so far:

class ExtractFromPostgreSQLFn(beam.DoFn):
    """
    Extract PCollection from PostgreSQL
    """

    def start_bundle(self):
        self._session = Session()

    def process(self, element):
        raise NotImplementedError

    def finish_bundle(self):
        self._session.close()


class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
    def start_bundle(self, arg1, arg2):
        self._arg1 = arg1
        self._arg2 = arg2

    def process(self, element):
        entities = (
            self._session.query(Entity)
            .filter(Entity.arg1 == self._arg1)
            .filter(Entity.arg2 == self._arg2)
            .all()
        )
        return (self._arg1, entities)


class ExtractEntity(PTransform):
    """
    A ```PTransform``` Extract of the Entity.
    """

    def __init__(self, arg1, arg2="enabled"):
        """
        Inializes ```ExtractEntity```
        Args:
            arg1
            arg2
        """
        if not arg1:
            raise ValueError("arg1 cannot be empty")

        self._arg1 = arg1
        self._arg2 = arg2

    def expand(self, pcoll):
        """
        This is a composite transform involves the following:
        1. Create a query object
        2. Run the query against the database.
        """
        database_entities = pcoll.pipeline | "Extract datastore users" >> beam.ParDo(
            ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
        )
        return database_entities

The PTransform is called at the very beginning of my pipeline:

p = beam.Pipeline(options=pipeline_options)
database = (
    p
    | "Extract datastore users"

    >> ExtractDatastoreUsers(arg1="123456")

)

It keeps raising AttributeError: 'PBegin' object has no attribute 'windowing'.

Please note that it's only a draft (I will extract several entities from the database, so the query will not be "hard-coded" but passed as a parameter at some point).

I have thus several questions:

1) Does anyone know why what I am trying to achieve is not working ?

2) Is this the good way to proceed, i.e. creating a custom PTransform which executes ParDo operations, or should I go directly with ParDo operations ?

3) Apart from https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, is there a guide on own to proper write a custom PTransform ?

4) Is it better to use the low-level psycopg2 driver here or using SQLAlchemy is fine ?

Many thanks in advance for your time and help !

Jonathan