You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Julien Chaty-Capelle <ju...@deepomatic.com> on 2022/03/25 10:09:29 UTC

Working with SDF

Hello there,
I'm actually working on moving parts of my company's  architectures toward
something more cloud native. I ended up using beam as its quite
straightforward to use with our current stack.

I have some pipelines working at this time but I need to move some of them
to SDF to increase parallelism.
I tried to implement an SDF with a custom RestrictionTracker according to
the doc here https://beam.apache.org/documentation/programming-guide/  and
code here
https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/iobase.py
but few things were surprising and some help would be welcomed.


   1. First, I had to delete the compiled cython common.pdx file as it was
   preventing beam from finding my restriction param (reinstalling apache-beam
   though pip didn't solve the problem)
   2. It looks like with SDF the setup and teardown methods of my DoFn are
   never called
   3. check_done is called on my restriction tracker before the process
   method is called on my DoFn. This is weird as the documentation states that
   check_done should raise an error if there are some unclaimed work (no work
   is claimed if process hasn't been called yet)


For the record I am using a GroupIntoBatches later on the pipeline, which
looks like this :

   - beam.Create (very small initial set)
   - beam.ParDo (regular DoFn)
   - beam.ParDo (SDF)
   - beam.ParDo (regular DoFn)
   - beam.GroupIntoBatches
   - beam.ParDo (regular DoFn)


Any help would be greatly appreciated


-- 
Julien CHATY-CAPELLE

Full stack developer/Integrator
Deepomatic

Re: Working with SDF

Posted by Julien Chaty-Capelle <ju...@deepomatic.com>.
Thank you for your feeback !
I tried to make a simpler version of my problem here :
https://pastebin.com/sa1P2EEc (I added some print statements as I don't
really know how debugger friendly beam is)

I'm running on Ubuntu 20.04, Python 3.8.10 with beam 2.37.0 like this
(direct runner) : python -m my_file

I'm facing the following error when I run the command : AttributeError:
'apache_beam.runners.common.MethodWrapper' object has no attribute
'watermark_estimator_provider' [while running 'ParDo(Crawl)/pair']
If I delete the runners/*.so and runners/*.pxd in the python lib directory
the script starts properly but I get the following output :

> tag:seed_process
> tag:initial_restriction
> tag:create_tracker
> tag:init
> tag:check_done 28
> tag:process
>

which basically shows that check_done is called on my tracker (with 28
parts to process)  before the process method of my pardo.


On Fri, Mar 25, 2022 at 11:04 PM Pablo Estrada <pa...@google.com> wrote:

> Hi Julien!
> Are you able to share some of your code? Whenever I write any code, I try
> to write simple unit tests to verify my progress. If you have observed unit
> tests where the setup/teardown are not called, then that's a bug and we
> need to fix it.
> In general - how are you testing these changes? Locally? On a distributed
> runner?
>
> FWIW, the general architecture that you're showing looks fine. There
> should not be a problem with a pipeline that does what you show.
> Best
> -P.
>
> On Fri, Mar 25, 2022 at 9:58 AM Ahmet Altay <al...@google.com> wrote:
>
>> /cc @Pablo Estrada <pa...@google.com> - might be able to help.
>>
>> On Fri, Mar 25, 2022 at 3:09 AM Julien Chaty-Capelle <
>> julien.chaty-capelle@deepomatic.com> wrote:
>>
>>> Hello there,
>>> I'm actually working on moving parts of my company's  architectures
>>> toward something more cloud native. I ended up using beam as its quite
>>> straightforward to use with our current stack.
>>>
>>> I have some pipelines working at this time but I need to move some of
>>> them to SDF to increase parallelism.
>>> I tried to implement an SDF with a custom RestrictionTracker according
>>> to the doc here https://beam.apache.org/documentation/programming-guide/
>>> and code here
>>> https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/iobase.py
>>> but few things were surprising and some help would be welcomed.
>>>
>>>
>>>    1. First, I had to delete the compiled cython common.pdx file as it
>>>    was preventing beam from finding my restriction param (reinstalling
>>>    apache-beam though pip didn't solve the problem)
>>>    2. It looks like with SDF the setup and teardown methods of my DoFn
>>>    are never called
>>>    3. check_done is called on my restriction tracker before the process
>>>    method is called on my DoFn. This is weird as the documentation states that
>>>    check_done should raise an error if there are some unclaimed work (no work
>>>    is claimed if process hasn't been called yet)
>>>
>>>
>>> For the record I am using a GroupIntoBatches later on the pipeline,
>>> which looks like this :
>>>
>>>    - beam.Create (very small initial set)
>>>    - beam.ParDo (regular DoFn)
>>>    - beam.ParDo (SDF)
>>>    - beam.ParDo (regular DoFn)
>>>    - beam.GroupIntoBatches
>>>    - beam.ParDo (regular DoFn)
>>>
>>>
>>> Any help would be greatly appreciated
>>>
>>>
>>> --
>>> Julien CHATY-CAPELLE
>>>
>>> Full stack developer/Integrator
>>> Deepomatic
>>>
>>

Re: Working with SDF

Posted by Pablo Estrada <pa...@google.com>.
Hi Julien!
Are you able to share some of your code? Whenever I write any code, I try
to write simple unit tests to verify my progress. If you have observed unit
tests where the setup/teardown are not called, then that's a bug and we
need to fix it.
In general - how are you testing these changes? Locally? On a distributed
runner?

FWIW, the general architecture that you're showing looks fine. There should
not be a problem with a pipeline that does what you show.
Best
-P.

On Fri, Mar 25, 2022 at 9:58 AM Ahmet Altay <al...@google.com> wrote:

> /cc @Pablo Estrada <pa...@google.com> - might be able to help.
>
> On Fri, Mar 25, 2022 at 3:09 AM Julien Chaty-Capelle <
> julien.chaty-capelle@deepomatic.com> wrote:
>
>> Hello there,
>> I'm actually working on moving parts of my company's  architectures
>> toward something more cloud native. I ended up using beam as its quite
>> straightforward to use with our current stack.
>>
>> I have some pipelines working at this time but I need to move some of
>> them to SDF to increase parallelism.
>> I tried to implement an SDF with a custom RestrictionTracker according to
>> the doc here https://beam.apache.org/documentation/programming-guide/
>> and code here
>> https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/iobase.py
>> but few things were surprising and some help would be welcomed.
>>
>>
>>    1. First, I had to delete the compiled cython common.pdx file as it
>>    was preventing beam from finding my restriction param (reinstalling
>>    apache-beam though pip didn't solve the problem)
>>    2. It looks like with SDF the setup and teardown methods of my DoFn
>>    are never called
>>    3. check_done is called on my restriction tracker before the process
>>    method is called on my DoFn. This is weird as the documentation states that
>>    check_done should raise an error if there are some unclaimed work (no work
>>    is claimed if process hasn't been called yet)
>>
>>
>> For the record I am using a GroupIntoBatches later on the pipeline, which
>> looks like this :
>>
>>    - beam.Create (very small initial set)
>>    - beam.ParDo (regular DoFn)
>>    - beam.ParDo (SDF)
>>    - beam.ParDo (regular DoFn)
>>    - beam.GroupIntoBatches
>>    - beam.ParDo (regular DoFn)
>>
>>
>> Any help would be greatly appreciated
>>
>>
>> --
>> Julien CHATY-CAPELLE
>>
>> Full stack developer/Integrator
>> Deepomatic
>>
>

Re: Working with SDF

Posted by Ahmet Altay <al...@google.com>.
/cc @Pablo Estrada <pa...@google.com> - might be able to help.

On Fri, Mar 25, 2022 at 3:09 AM Julien Chaty-Capelle <
julien.chaty-capelle@deepomatic.com> wrote:

> Hello there,
> I'm actually working on moving parts of my company's  architectures toward
> something more cloud native. I ended up using beam as its quite
> straightforward to use with our current stack.
>
> I have some pipelines working at this time but I need to move some of them
> to SDF to increase parallelism.
> I tried to implement an SDF with a custom RestrictionTracker according to
> the doc here https://beam.apache.org/documentation/programming-guide/
> and code here
> https://github.com/apache/beam/blob/8e217ea0d1f383ef5033ef507b14d01edf9c67e6/sdks/python/apache_beam/io/iobase.py
> but few things were surprising and some help would be welcomed.
>
>
>    1. First, I had to delete the compiled cython common.pdx file as it
>    was preventing beam from finding my restriction param (reinstalling
>    apache-beam though pip didn't solve the problem)
>    2. It looks like with SDF the setup and teardown methods of my DoFn
>    are never called
>    3. check_done is called on my restriction tracker before the process
>    method is called on my DoFn. This is weird as the documentation states that
>    check_done should raise an error if there are some unclaimed work (no work
>    is claimed if process hasn't been called yet)
>
>
> For the record I am using a GroupIntoBatches later on the pipeline, which
> looks like this :
>
>    - beam.Create (very small initial set)
>    - beam.ParDo (regular DoFn)
>    - beam.ParDo (SDF)
>    - beam.ParDo (regular DoFn)
>    - beam.GroupIntoBatches
>    - beam.ParDo (regular DoFn)
>
>
> Any help would be greatly appreciated
>
>
> --
> Julien CHATY-CAPELLE
>
> Full stack developer/Integrator
> Deepomatic
>