You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Dmitry Demeshchuk <dm...@postmates.com> on 2017/07/21 21:06:15 UTC

[Python] Replace ParDo's DoFn after both are constructed

Hi list,

I'm trying to make a transformation function (let's call it ErrorSieve)
that would take a ParDo object as input and modify its underlying DoFn
object, basically adding extra logic on top of an underlying process()
method.

Ideally for me, the example usage would be:

```python
p | ErrorSieve(beam.ParDo(MyDoFn())

or

p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
```

However, this would require me to butcher the internals of ParDo
mechanisms, especially since ParDo's make_fn() method gets called during
its transformation. My other thinking was to make it a fair and square DoFn:

```python
p | beam.ParDo(ErrorSieve(MyDoFn())
```

The only problem with this is that I can't use it with transforms like
FlatMap, which is a bit unfortunate.

Do you think it's worth investigating how to implement the first approach,
or should I just instead settle with the second approach, using only custom
DoFns?

Thank you.

-- 
Best regards,
Dmitry Demeshchuk.

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Dmitry Demeshchuk <dm...@postmates.com>.
Ooos, that was dumb. I'm starting to see value in using typed languages.

Thanks a lot, Robert!

On Fri, Jul 21, 2017 at 5:09 PM, Robert Bradshaw <ro...@google.com>
wrote:

> The line
>
>         return pcoll.pipeline | beam.ParDo(dofn)
>
> should read
>
>         return pcoll| beam.ParDo(dofn)
>
>
> On Fri, Jul 21, 2017 at 3:25 PM, Dmitry Demeshchuk <dm...@postmates.com>
> wrote:
>
>> Yep, that's what I ended up doing.
>>
>> Here's the full code:
>>
>> class ErrorSieve(beam.PTransform):
>>     class ErrorSieveDoFn(beam.DoFn):
>>         def __init__(self, dofn, save_exceptions=False, save_stacktraces=False):
>>             self.dofn = dofn
>>             self.save_exceptions = save_exceptions
>>             self.save_stacktraces = save_stacktraces
>>
>>         def process(self, *args, **kwargs):
>>             try:
>>                 yield inner_process(*args, **kwargs)
>>             except Exception as e:
>>                 result = [args, kwargs]
>>                 if self.save_exceptions:
>>                     result = result + [e]
>>                 if self.save_stacktraces:
>>                     result = result + [traceback.extract_stack()]
>>                 yield beam.pvalue.TaggedOutput('errors', result)
>>
>>     def __init__(self, pardo, save_exceptions=False, save_stacktraces=False):
>>         self.fn = pardo.fn
>>         self.save_exceptions = save_exceptions
>>         self.save_stacktraces = save_stacktraces
>>
>>     def expand(self, pcoll):
>>         dofn = ErrorSieve.ErrorSieveDoFn(self.fn, self.save_exceptions, self.save_stacktraces)
>>         return pcoll.pipeline | beam.ParDo(dofn)
>>
>> ​
>>
>> However, when I try to use it with a TestPipeline() object, I get the
>> following exception:
>>
>> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
>> _ _ _
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:183: in
>> __exit__
>>     self.run().wait_until_finish()
>> ../venv/lib/python2.7/site-packages/apache_beam/testing/test_pipeline.py:96:
>> in run
>>     result = super(TestPipeline, self).run()
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:167: in run
>>     self.to_runner_api(), self.runner, self._options).run(False)
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:347: in
>> to_runner_api
>>     root_transform_id = context.transforms.get_id(self._root_transform())
>> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
>> in get_id
>>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
>> to_runner_api
>>     subtransforms=[context.transforms.get_id(part) for part in
>> self.parts],
>> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
>> in get_id
>>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
>> to_runner_api
>>     subtransforms=[context.transforms.get_id(part) for part in
>> self.parts],
>> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
>> in get_id
>>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:544: in
>> to_runner_api
>>     for tag, out in self.named_outputs().items()},
>> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:543: in
>> <dictcomp>
>>     outputs={str(tag): context.pcollections.get_id(out)
>> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
>> in get_id
>>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
>> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:139: in
>> to_runner_api
>>     self.windowing))
>> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:121: in
>> windowing
>>     self.producer.inputs)
>> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
>> _ _ _
>>
>> self = <ParDo(PTransform) label=[ParDo(ErrorSieveDoFn)] at 0x10b750590>
>> inputs = (<PBegin[None.None] at 0x10b7504d0>,)
>>
>>     def get_windowing(self, inputs):
>>       """Returns the window function to be associated with transform's
>> output.
>>
>>         By default most transforms just return the windowing function
>> associated
>>         with the input PCollection (or the first input if several).
>>         """
>>       # TODO(robertwb): Assert all input WindowFns compatible.
>> >     return inputs[0].windowing
>> E     AttributeError: 'PBegin' object has no attribute 'windowing'
>>
>> ../venv/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py:343:
>> AttributeError
>>
>> The test code goes like this:
>>
>> class ErrorSieveTest(unittest.TestCase):
>>     def test_with_data_only(self):
>>         with TestPipeline() as p:
>>             results = (p
>>                     | beam.Create([1, 2, "3", 4])
>>                     | ErrorSieve(beam.Map(lambda x: x + 1))
>>             )
>>
>> ​
>> I don't know much about how windowing works under the hood, so will have
>> to do some digging.
>>
>> In the meantime, would appreciate any insight into this.
>>
>> Thanks!
>>
>> On Fri, Jul 21, 2017 at 3:07 PM, Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> Rather than mutating the DoFn itself, I would create a new ParDo that
>>> wraps and invokes the inner one, e.g.
>>>
>>> class ErrorSieveParDo(beam.DoFn):
>>>   def __init__(self, dofn):
>>>     self._dofn = dofn
>>>   def process(self, *args, **kwargs):
>>>       # note that raw_result is an *iterable* or None
>>>       raw_result = self._dofn.process(*args, **kwargs)
>>>       ...
>>>   # don't forget start/finish_bundle if needed
>>>
>>> output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn()))
>>>
>>> It's correct that you can't use this in Map/FlatMap. Alternatively, make
>>> a transform
>>>
>>> class ErrorSieve(PTransform):
>>>   def __init__(self, pardo):
>>>     self.fn = pardo.fn
>>>   def expand(self, input_pcoll):
>>>     return input_pcoll | ParDo(ErrorSieveParDo(self.fn))
>>>
>>> output_pcoll = input_pcoll | ErrorSieve(SomeDoFn())
>>>
>>> Even better, however, might be to do something like
>>>
>>> class ErrorSieve(PTransform):
>>>   def __init__(self, pardo):
>>>     self.fn = pardo.fn
>>>   def expand(self, pcoll_input):
>>>     return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn)
>>>
>>> Where AnotherDoFn filters/modifies the outputs (assuming it can be done
>>> elementwise).
>>>
>>>
>>> On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <dmitry@postmates.com
>>> > wrote:
>>>
>>>> Hi Sourabh,
>>>>
>>>> Great call, thanks. I was thinking about a slightly different
>>>> interface, but this is exactly the direction I wanted to go. So, my
>>>> approach, I guess, would be something like that:
>>>>
>>>> class ErrorSieve(PTransform):
>>>>   def __init__(self, pardo):
>>>>     self.fn = pardo.fn
>>>>   def expand(self):
>>>>     inner_process = self.fn.process
>>>>     def process(self, *args, **kwargs):
>>>>       raw_result = inner_process(*args, **kwargs)
>>>>       result = ...
>>>>       yield result
>>>>     setattr(fn, 'process', process)
>>>>     return ParDo(self.fn)
>>>>
>>>> ​
>>>>
>>>> I'll share a working snippet once it's done.
>>>>
>>>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <sourabhbajaj@google.com
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Is it possible to create
>>>>>
>>>>> class ErrorSieve(PTransform):
>>>>>    def __init__ (dofn):
>>>>>    def expand():
>>>>>          return ParDo(modifiedDoFn)
>>>>>
>>>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>>>> don't expose the ParDo to the user.
>>>>>
>>>>> Will this work for your usecase?
>>>>>
>>>>> -Sourabh
>>>>>
>>>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <
>>>>> dmitry@postmates.com> wrote:
>>>>>
>>>>>> Hi list,
>>>>>>
>>>>>> I'm trying to make a transformation function (let's call it
>>>>>> ErrorSieve) that would take a ParDo object as input and modify its
>>>>>> underlying DoFn object, basically adding extra logic on top of an
>>>>>> underlying process() method.
>>>>>>
>>>>>> Ideally for me, the example usage would be:
>>>>>>
>>>>>> ```python
>>>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>>>
>>>>>> or
>>>>>>
>>>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>>>> ```
>>>>>>
>>>>>> However, this would require me to butcher the internals of ParDo
>>>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>>>> its transformation. My other thinking was to make it a fair and square DoFn:
>>>>>>
>>>>>> ```python
>>>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>>>> ```
>>>>>>
>>>>>> The only problem with this is that I can't use it with transforms
>>>>>> like FlatMap, which is a bit unfortunate.
>>>>>>
>>>>>> Do you think it's worth investigating how to implement the first
>>>>>> approach, or should I just instead settle with the second approach, using
>>>>>> only custom DoFns?
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Dmitry Demeshchuk.
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Dmitry Demeshchuk.
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>
>


-- 
Best regards,
Dmitry Demeshchuk.

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Robert Bradshaw <ro...@google.com>.
The line

        return pcoll.pipeline | beam.ParDo(dofn)

should read

        return pcoll| beam.ParDo(dofn)


On Fri, Jul 21, 2017 at 3:25 PM, Dmitry Demeshchuk <dm...@postmates.com>
wrote:

> Yep, that's what I ended up doing.
>
> Here's the full code:
>
> class ErrorSieve(beam.PTransform):
>     class ErrorSieveDoFn(beam.DoFn):
>         def __init__(self, dofn, save_exceptions=False, save_stacktraces=False):
>             self.dofn = dofn
>             self.save_exceptions = save_exceptions
>             self.save_stacktraces = save_stacktraces
>
>         def process(self, *args, **kwargs):
>             try:
>                 yield inner_process(*args, **kwargs)
>             except Exception as e:
>                 result = [args, kwargs]
>                 if self.save_exceptions:
>                     result = result + [e]
>                 if self.save_stacktraces:
>                     result = result + [traceback.extract_stack()]
>                 yield beam.pvalue.TaggedOutput('errors', result)
>
>     def __init__(self, pardo, save_exceptions=False, save_stacktraces=False):
>         self.fn = pardo.fn
>         self.save_exceptions = save_exceptions
>         self.save_stacktraces = save_stacktraces
>
>     def expand(self, pcoll):
>         dofn = ErrorSieve.ErrorSieveDoFn(self.fn, self.save_exceptions, self.save_stacktraces)
>         return pcoll.pipeline | beam.ParDo(dofn)
>
> ​
>
> However, when I try to use it with a TestPipeline() object, I get the
> following exception:
>
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:183: in
> __exit__
>     self.run().wait_until_finish()
> ../venv/lib/python2.7/site-packages/apache_beam/testing/test_pipeline.py:96:
> in run
>     result = super(TestPipeline, self).run()
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:167: in run
>     self.to_runner_api(), self.runner, self._options).run(False)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:347: in
> to_runner_api
>     root_transform_id = context.transforms.get_id(self._root_transform())
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
> to_runner_api
>     subtransforms=[context.transforms.get_id(part) for part in
> self.parts],
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
> to_runner_api
>     subtransforms=[context.transforms.get_id(part) for part in
> self.parts],
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:544: in
> to_runner_api
>     for tag, out in self.named_outputs().items()},
> ../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:543: in
> <dictcomp>
>     outputs={str(tag): context.pcollections.get_id(out)
> ../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
> in get_id
>     self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:139: in
> to_runner_api
>     self.windowing))
> ../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:121: in
> windowing
>     self.producer.inputs)
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> _ _ _
>
> self = <ParDo(PTransform) label=[ParDo(ErrorSieveDoFn)] at 0x10b750590>
> inputs = (<PBegin[None.None] at 0x10b7504d0>,)
>
>     def get_windowing(self, inputs):
>       """Returns the window function to be associated with transform's
> output.
>
>         By default most transforms just return the windowing function
> associated
>         with the input PCollection (or the first input if several).
>         """
>       # TODO(robertwb): Assert all input WindowFns compatible.
> >     return inputs[0].windowing
> E     AttributeError: 'PBegin' object has no attribute 'windowing'
>
> ../venv/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py:343:
> AttributeError
>
> The test code goes like this:
>
> class ErrorSieveTest(unittest.TestCase):
>     def test_with_data_only(self):
>         with TestPipeline() as p:
>             results = (p
>                     | beam.Create([1, 2, "3", 4])
>                     | ErrorSieve(beam.Map(lambda x: x + 1))
>             )
>
> ​
> I don't know much about how windowing works under the hood, so will have
> to do some digging.
>
> In the meantime, would appreciate any insight into this.
>
> Thanks!
>
> On Fri, Jul 21, 2017 at 3:07 PM, Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Rather than mutating the DoFn itself, I would create a new ParDo that
>> wraps and invokes the inner one, e.g.
>>
>> class ErrorSieveParDo(beam.DoFn):
>>   def __init__(self, dofn):
>>     self._dofn = dofn
>>   def process(self, *args, **kwargs):
>>       # note that raw_result is an *iterable* or None
>>       raw_result = self._dofn.process(*args, **kwargs)
>>       ...
>>   # don't forget start/finish_bundle if needed
>>
>> output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn()))
>>
>> It's correct that you can't use this in Map/FlatMap. Alternatively, make
>> a transform
>>
>> class ErrorSieve(PTransform):
>>   def __init__(self, pardo):
>>     self.fn = pardo.fn
>>   def expand(self, input_pcoll):
>>     return input_pcoll | ParDo(ErrorSieveParDo(self.fn))
>>
>> output_pcoll = input_pcoll | ErrorSieve(SomeDoFn())
>>
>> Even better, however, might be to do something like
>>
>> class ErrorSieve(PTransform):
>>   def __init__(self, pardo):
>>     self.fn = pardo.fn
>>   def expand(self, pcoll_input):
>>     return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn)
>>
>> Where AnotherDoFn filters/modifies the outputs (assuming it can be done
>> elementwise).
>>
>>
>> On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <dm...@postmates.com>
>> wrote:
>>
>>> Hi Sourabh,
>>>
>>> Great call, thanks. I was thinking about a slightly different interface,
>>> but this is exactly the direction I wanted to go. So, my approach, I guess,
>>> would be something like that:
>>>
>>> class ErrorSieve(PTransform):
>>>   def __init__(self, pardo):
>>>     self.fn = pardo.fn
>>>   def expand(self):
>>>     inner_process = self.fn.process
>>>     def process(self, *args, **kwargs):
>>>       raw_result = inner_process(*args, **kwargs)
>>>       result = ...
>>>       yield result
>>>     setattr(fn, 'process', process)
>>>     return ParDo(self.fn)
>>>
>>> ​
>>>
>>> I'll share a working snippet once it's done.
>>>
>>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <so...@google.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is it possible to create
>>>>
>>>> class ErrorSieve(PTransform):
>>>>    def __init__ (dofn):
>>>>    def expand():
>>>>          return ParDo(modifiedDoFn)
>>>>
>>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>>> don't expose the ParDo to the user.
>>>>
>>>> Will this work for your usecase?
>>>>
>>>> -Sourabh
>>>>
>>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dm...@postmates.com>
>>>> wrote:
>>>>
>>>>> Hi list,
>>>>>
>>>>> I'm trying to make a transformation function (let's call it
>>>>> ErrorSieve) that would take a ParDo object as input and modify its
>>>>> underlying DoFn object, basically adding extra logic on top of an
>>>>> underlying process() method.
>>>>>
>>>>> Ideally for me, the example usage would be:
>>>>>
>>>>> ```python
>>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>>
>>>>> or
>>>>>
>>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>>> ```
>>>>>
>>>>> However, this would require me to butcher the internals of ParDo
>>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>>> its transformation. My other thinking was to make it a fair and square DoFn:
>>>>>
>>>>> ```python
>>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>>> ```
>>>>>
>>>>> The only problem with this is that I can't use it with transforms like
>>>>> FlatMap, which is a bit unfortunate.
>>>>>
>>>>> Do you think it's worth investigating how to implement the first
>>>>> approach, or should I just instead settle with the second approach, using
>>>>> only custom DoFns?
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Dmitry Demeshchuk.
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Dmitry Demeshchuk <dm...@postmates.com>.
Yep, that's what I ended up doing.

Here's the full code:

class ErrorSieve(beam.PTransform):
    class ErrorSieveDoFn(beam.DoFn):
        def __init__(self, dofn, save_exceptions=False, save_stacktraces=False):
            self.dofn = dofn
            self.save_exceptions = save_exceptions
            self.save_stacktraces = save_stacktraces

        def process(self, *args, **kwargs):
            try:
                yield inner_process(*args, **kwargs)
            except Exception as e:
                result = [args, kwargs]
                if self.save_exceptions:
                    result = result + [e]
                if self.save_stacktraces:
                    result = result + [traceback.extract_stack()]
                yield beam.pvalue.TaggedOutput('errors', result)

    def __init__(self, pardo, save_exceptions=False, save_stacktraces=False):
        self.fn = pardo.fn
        self.save_exceptions = save_exceptions
        self.save_stacktraces = save_stacktraces

    def expand(self, pcoll):
        dofn = ErrorSieve.ErrorSieveDoFn(self.fn,
self.save_exceptions, self.save_stacktraces)
        return pcoll.pipeline | beam.ParDo(dofn)

​

However, when I try to use it with a TestPipeline() object, I get the
following exception:

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:183: in __exit__
    self.run().wait_until_finish()
../venv/lib/python2.7/site-packages/apache_beam/testing/test_pipeline.py:96:
in run
    result = super(TestPipeline, self).run()
../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:167: in run
    self.to_runner_api(), self.runner, self._options).run(False)
../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:347: in
to_runner_api
    root_transform_id = context.transforms.get_id(self._root_transform())
../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
in get_id
    self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
to_runner_api
    subtransforms=[context.transforms.get_id(part) for part in self.parts],
../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
in get_id
    self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:539: in
to_runner_api
    subtransforms=[context.transforms.get_id(part) for part in self.parts],
../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
in get_id
    self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:544: in
to_runner_api
    for tag, out in self.named_outputs().items()},
../venv/lib/python2.7/site-packages/apache_beam/pipeline.py:543: in
<dictcomp>
    outputs={str(tag): context.pcollections.get_id(out)
../venv/lib/python2.7/site-packages/apache_beam/runners/pipeline_context.py:58:
in get_id
    self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:139: in
to_runner_api
    self.windowing))
../venv/lib/python2.7/site-packages/apache_beam/pvalue.py:121: in windowing
    self.producer.inputs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _

self = <ParDo(PTransform) label=[ParDo(ErrorSieveDoFn)] at 0x10b750590>
inputs = (<PBegin[None.None] at 0x10b7504d0>,)

    def get_windowing(self, inputs):
      """Returns the window function to be associated with transform's
output.

        By default most transforms just return the windowing function
associated
        with the input PCollection (or the first input if several).
        """
      # TODO(robertwb): Assert all input WindowFns compatible.
>     return inputs[0].windowing
E     AttributeError: 'PBegin' object has no attribute 'windowing'

../venv/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py:343:
AttributeError

The test code goes like this:

class ErrorSieveTest(unittest.TestCase):
    def test_with_data_only(self):
        with TestPipeline() as p:
            results = (p
                    | beam.Create([1, 2, "3", 4])
                    | ErrorSieve(beam.Map(lambda x: x + 1))
            )

​
I don't know much about how windowing works under the hood, so will have to
do some digging.

In the meantime, would appreciate any insight into this.

Thanks!

On Fri, Jul 21, 2017 at 3:07 PM, Robert Bradshaw <ro...@google.com>
wrote:

> Rather than mutating the DoFn itself, I would create a new ParDo that
> wraps and invokes the inner one, e.g.
>
> class ErrorSieveParDo(beam.DoFn):
>   def __init__(self, dofn):
>     self._dofn = dofn
>   def process(self, *args, **kwargs):
>       # note that raw_result is an *iterable* or None
>       raw_result = self._dofn.process(*args, **kwargs)
>       ...
>   # don't forget start/finish_bundle if needed
>
> output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn()))
>
> It's correct that you can't use this in Map/FlatMap. Alternatively, make a
> transform
>
> class ErrorSieve(PTransform):
>   def __init__(self, pardo):
>     self.fn = pardo.fn
>   def expand(self, input_pcoll):
>     return input_pcoll | ParDo(ErrorSieveParDo(self.fn))
>
> output_pcoll = input_pcoll | ErrorSieve(SomeDoFn())
>
> Even better, however, might be to do something like
>
> class ErrorSieve(PTransform):
>   def __init__(self, pardo):
>     self.fn = pardo.fn
>   def expand(self, pcoll_input):
>     return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn)
>
> Where AnotherDoFn filters/modifies the outputs (assuming it can be done
> elementwise).
>
>
> On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <dm...@postmates.com>
> wrote:
>
>> Hi Sourabh,
>>
>> Great call, thanks. I was thinking about a slightly different interface,
>> but this is exactly the direction I wanted to go. So, my approach, I guess,
>> would be something like that:
>>
>> class ErrorSieve(PTransform):
>>   def __init__(self, pardo):
>>     self.fn = pardo.fn
>>   def expand(self):
>>     inner_process = self.fn.process
>>     def process(self, *args, **kwargs):
>>       raw_result = inner_process(*args, **kwargs)
>>       result = ...
>>       yield result
>>     setattr(fn, 'process', process)
>>     return ParDo(self.fn)
>>
>> ​
>>
>> I'll share a working snippet once it's done.
>>
>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <so...@google.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Is it possible to create
>>>
>>> class ErrorSieve(PTransform):
>>>    def __init__ (dofn):
>>>    def expand():
>>>          return ParDo(modifiedDoFn)
>>>
>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>> don't expose the ParDo to the user.
>>>
>>> Will this work for your usecase?
>>>
>>> -Sourabh
>>>
>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dm...@postmates.com>
>>> wrote:
>>>
>>>> Hi list,
>>>>
>>>> I'm trying to make a transformation function (let's call it ErrorSieve)
>>>> that would take a ParDo object as input and modify its underlying DoFn
>>>> object, basically adding extra logic on top of an underlying process()
>>>> method.
>>>>
>>>> Ideally for me, the example usage would be:
>>>>
>>>> ```python
>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>
>>>> or
>>>>
>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>> ```
>>>>
>>>> However, this would require me to butcher the internals of ParDo
>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>> its transformation. My other thinking was to make it a fair and square DoFn:
>>>>
>>>> ```python
>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>> ```
>>>>
>>>> The only problem with this is that I can't use it with transforms like
>>>> FlatMap, which is a bit unfortunate.
>>>>
>>>> Do you think it's worth investigating how to implement the first
>>>> approach, or should I just instead settle with the second approach, using
>>>> only custom DoFns?
>>>>
>>>> Thank you.
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Dmitry Demeshchuk.
>>>>
>>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>
>


-- 
Best regards,
Dmitry Demeshchuk.

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Robert Bradshaw <ro...@google.com>.
Rather than mutating the DoFn itself, I would create a new ParDo that wraps
and invokes the inner one, e.g.

class ErrorSieveParDo(beam.DoFn):
  def __init__(self, dofn):
    self._dofn = dofn
  def process(self, *args, **kwargs):
      # note that raw_result is an *iterable* or None
      raw_result = self._dofn.process(*args, **kwargs)
      ...
  # don't forget start/finish_bundle if needed

output_pcoll = input_pcoll | ParDo(ErrorSieveParDo(SomeDoFn()))

It's correct that you can't use this in Map/FlatMap. Alternatively, make a
transform

class ErrorSieve(PTransform):
  def __init__(self, pardo):
    self.fn = pardo.fn
  def expand(self, input_pcoll):
    return input_pcoll | ParDo(ErrorSieveParDo(self.fn))

output_pcoll = input_pcoll | ErrorSieve(SomeDoFn())

Even better, however, might be to do something like

class ErrorSieve(PTransform):
  def __init__(self, pardo):
    self.fn = pardo.fn
  def expand(self, pcoll_input):
    return raw_results_pcoll | ParDo(self.fn) | ParDo(AnotherDoFn)

Where AnotherDoFn filters/modifies the outputs (assuming it can be done
elementwise).


On Fri, Jul 21, 2017 at 2:46 PM, Dmitry Demeshchuk <dm...@postmates.com>
wrote:

> Hi Sourabh,
>
> Great call, thanks. I was thinking about a slightly different interface,
> but this is exactly the direction I wanted to go. So, my approach, I guess,
> would be something like that:
>
> class ErrorSieve(PTransform):
>   def __init__(self, pardo):
>     self.fn = pardo.fn
>   def expand(self):
>     inner_process = self.fn.process
>     def process(self, *args, **kwargs):
>       raw_result = inner_process(*args, **kwargs)
>       result = ...
>       yield result
>     setattr(fn, 'process', process)
>     return ParDo(self.fn)
>
> ​
>
> I'll share a working snippet once it's done.
>
> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <so...@google.com>
> wrote:
>
>> Hi,
>>
>> Is it possible to create
>>
>> class ErrorSieve(PTransform):
>>    def __init__ (dofn):
>>    def expand():
>>          return ParDo(modifiedDoFn)
>>
>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>> don't expose the ParDo to the user.
>>
>> Will this work for your usecase?
>>
>> -Sourabh
>>
>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dm...@postmates.com>
>> wrote:
>>
>>> Hi list,
>>>
>>> I'm trying to make a transformation function (let's call it ErrorSieve)
>>> that would take a ParDo object as input and modify its underlying DoFn
>>> object, basically adding extra logic on top of an underlying process()
>>> method.
>>>
>>> Ideally for me, the example usage would be:
>>>
>>> ```python
>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>
>>> or
>>>
>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>> ```
>>>
>>> However, this would require me to butcher the internals of ParDo
>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>> its transformation. My other thinking was to make it a fair and square DoFn:
>>>
>>> ```python
>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>> ```
>>>
>>> The only problem with this is that I can't use it with transforms like
>>> FlatMap, which is a bit unfortunate.
>>>
>>> Do you think it's worth investigating how to implement the first
>>> approach, or should I just instead settle with the second approach, using
>>> only custom DoFns?
>>>
>>> Thank you.
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Sourabh Bajaj <so...@google.com>.
Got it. Thanks

On Fri, Jul 21, 2017 at 3:04 PM Dmitry Demeshchuk <dm...@postmates.com>
wrote:

> The motivation of using ParDo is the following. What if I had code like
> this:
>
> ```
> p | beam.FlatMap(lambda x: x + 1)
> ```
>
> and wanted to have ErrorSieve on top of it? There's no explicitly defined
> DoFn in that expression, after all.
>
>
>
> On Fri, Jul 21, 2017 at 3:01 PM, Sourabh Bajaj <so...@google.com>
> wrote:
>
>> Thanks, just one thing I'm not sure about is why do you need to pass the
>> ParDo to the ErrorSieve instead of just passing the DoFn as you're only
>> modifying the DoFn.
>>
>> On Fri, Jul 21, 2017 at 2:47 PM Dmitry Demeshchuk <dm...@postmates.com>
>> wrote:
>>
>>> Hi Sourabh,
>>>
>>> Great call, thanks. I was thinking about a slightly different interface,
>>> but this is exactly the direction I wanted to go. So, my approach, I guess,
>>> would be something like that:
>>>
>>> class ErrorSieve(PTransform):
>>>   def __init__(self, pardo):
>>>     self.fn = pardo.fn
>>>   def expand(self):
>>>     inner_process = self.fn.process
>>>     def process(self, *args, **kwargs):
>>>       raw_result = inner_process(*args, **kwargs)
>>>       result = ...
>>>       yield result
>>>     setattr(fn, 'process', process)
>>>     return ParDo(self.fn)
>>>
>>> ​
>>>
>>> I'll share a working snippet once it's done.
>>>
>>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <so...@google.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is it possible to create
>>>>
>>>> class ErrorSieve(PTransform):
>>>>    def __init__ (dofn):
>>>>    def expand():
>>>>          return ParDo(modifiedDoFn)
>>>>
>>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>>> don't expose the ParDo to the user.
>>>>
>>>> Will this work for your usecase?
>>>>
>>>> -Sourabh
>>>>
>>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dm...@postmates.com>
>>>> wrote:
>>>>
>>>>> Hi list,
>>>>>
>>>>> I'm trying to make a transformation function (let's call it
>>>>> ErrorSieve) that would take a ParDo object as input and modify its
>>>>> underlying DoFn object, basically adding extra logic on top of an
>>>>> underlying process() method.
>>>>>
>>>>> Ideally for me, the example usage would be:
>>>>>
>>>>> ```python
>>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>>
>>>>> or
>>>>>
>>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>>> ```
>>>>>
>>>>> However, this would require me to butcher the internals of ParDo
>>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>>> its transformation. My other thinking was to make it a fair and square DoFn:
>>>>>
>>>>> ```python
>>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>>> ```
>>>>>
>>>>> The only problem with this is that I can't use it with transforms like
>>>>> FlatMap, which is a bit unfortunate.
>>>>>
>>>>> Do you think it's worth investigating how to implement the first
>>>>> approach, or should I just instead settle with the second approach, using
>>>>> only custom DoFns?
>>>>>
>>>>> Thank you.
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Dmitry Demeshchuk.
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Dmitry Demeshchuk <dm...@postmates.com>.
The motivation of using ParDo is the following. What if I had code like
this:

```
p | beam.FlatMap(lambda x: x + 1)
```

and wanted to have ErrorSieve on top of it? There's no explicitly defined
DoFn in that expression, after all.



On Fri, Jul 21, 2017 at 3:01 PM, Sourabh Bajaj <so...@google.com>
wrote:

> Thanks, just one thing I'm not sure about is why do you need to pass the
> ParDo to the ErrorSieve instead of just passing the DoFn as you're only
> modifying the DoFn.
>
> On Fri, Jul 21, 2017 at 2:47 PM Dmitry Demeshchuk <dm...@postmates.com>
> wrote:
>
>> Hi Sourabh,
>>
>> Great call, thanks. I was thinking about a slightly different interface,
>> but this is exactly the direction I wanted to go. So, my approach, I guess,
>> would be something like that:
>>
>> class ErrorSieve(PTransform):
>>   def __init__(self, pardo):
>>     self.fn = pardo.fn
>>   def expand(self):
>>     inner_process = self.fn.process
>>     def process(self, *args, **kwargs):
>>       raw_result = inner_process(*args, **kwargs)
>>       result = ...
>>       yield result
>>     setattr(fn, 'process', process)
>>     return ParDo(self.fn)
>>
>> ​
>>
>> I'll share a working snippet once it's done.
>>
>> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <so...@google.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Is it possible to create
>>>
>>> class ErrorSieve(PTransform):
>>>    def __init__ (dofn):
>>>    def expand():
>>>          return ParDo(modifiedDoFn)
>>>
>>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>>> don't expose the ParDo to the user.
>>>
>>> Will this work for your usecase?
>>>
>>> -Sourabh
>>>
>>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dm...@postmates.com>
>>> wrote:
>>>
>>>> Hi list,
>>>>
>>>> I'm trying to make a transformation function (let's call it ErrorSieve)
>>>> that would take a ParDo object as input and modify its underlying DoFn
>>>> object, basically adding extra logic on top of an underlying process()
>>>> method.
>>>>
>>>> Ideally for me, the example usage would be:
>>>>
>>>> ```python
>>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>>
>>>> or
>>>>
>>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>>> ```
>>>>
>>>> However, this would require me to butcher the internals of ParDo
>>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>>> its transformation. My other thinking was to make it a fair and square DoFn:
>>>>
>>>> ```python
>>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>>> ```
>>>>
>>>> The only problem with this is that I can't use it with transforms like
>>>> FlatMap, which is a bit unfortunate.
>>>>
>>>> Do you think it's worth investigating how to implement the first
>>>> approach, or should I just instead settle with the second approach, using
>>>> only custom DoFns?
>>>>
>>>> Thank you.
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Dmitry Demeshchuk.
>>>>
>>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>


-- 
Best regards,
Dmitry Demeshchuk.

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Sourabh Bajaj <so...@google.com>.
Thanks, just one thing I'm not sure about is why do you need to pass the
ParDo to the ErrorSieve instead of just passing the DoFn as you're only
modifying the DoFn.

On Fri, Jul 21, 2017 at 2:47 PM Dmitry Demeshchuk <dm...@postmates.com>
wrote:

> Hi Sourabh,
>
> Great call, thanks. I was thinking about a slightly different interface,
> but this is exactly the direction I wanted to go. So, my approach, I guess,
> would be something like that:
>
> class ErrorSieve(PTransform):
>   def __init__(self, pardo):
>     self.fn = pardo.fn
>   def expand(self):
>     inner_process = self.fn.process
>     def process(self, *args, **kwargs):
>       raw_result = inner_process(*args, **kwargs)
>       result = ...
>       yield result
>     setattr(fn, 'process', process)
>     return ParDo(self.fn)
>
> ​
>
> I'll share a working snippet once it's done.
>
> On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <so...@google.com>
> wrote:
>
>> Hi,
>>
>> Is it possible to create
>>
>> class ErrorSieve(PTransform):
>>    def __init__ (dofn):
>>    def expand():
>>          return ParDo(modifiedDoFn)
>>
>> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
>> don't expose the ParDo to the user.
>>
>> Will this work for your usecase?
>>
>> -Sourabh
>>
>> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dm...@postmates.com>
>> wrote:
>>
>>> Hi list,
>>>
>>> I'm trying to make a transformation function (let's call it ErrorSieve)
>>> that would take a ParDo object as input and modify its underlying DoFn
>>> object, basically adding extra logic on top of an underlying process()
>>> method.
>>>
>>> Ideally for me, the example usage would be:
>>>
>>> ```python
>>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>>
>>> or
>>>
>>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>>> ```
>>>
>>> However, this would require me to butcher the internals of ParDo
>>> mechanisms, especially since ParDo's make_fn() method gets called during
>>> its transformation. My other thinking was to make it a fair and square DoFn:
>>>
>>> ```python
>>> p | beam.ParDo(ErrorSieve(MyDoFn())
>>> ```
>>>
>>> The only problem with this is that I can't use it with transforms like
>>> FlatMap, which is a bit unfortunate.
>>>
>>> Do you think it's worth investigating how to implement the first
>>> approach, or should I just instead settle with the second approach, using
>>> only custom DoFns?
>>>
>>> Thank you.
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Dmitry Demeshchuk <dm...@postmates.com>.
Hi Sourabh,

Great call, thanks. I was thinking about a slightly different interface,
but this is exactly the direction I wanted to go. So, my approach, I guess,
would be something like that:

class ErrorSieve(PTransform):
  def __init__(self, pardo):
    self.fn = pardo.fn
  def expand(self):
    inner_process = self.fn.process
    def process(self, *args, **kwargs):
      raw_result = inner_process(*args, **kwargs)
      result = ...
      yield result
    setattr(fn, 'process', process)
    return ParDo(self.fn)

​

I'll share a working snippet once it's done.

On Fri, Jul 21, 2017 at 2:10 PM, Sourabh Bajaj <so...@google.com>
wrote:

> Hi,
>
> Is it possible to create
>
> class ErrorSieve(PTransform):
>    def __init__ (dofn):
>    def expand():
>          return ParDo(modifiedDoFn)
>
> that way your pipeline just looks like p | ErrorSieve(DoFn()) and you
> don't expose the ParDo to the user.
>
> Will this work for your usecase?
>
> -Sourabh
>
> On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dm...@postmates.com>
> wrote:
>
>> Hi list,
>>
>> I'm trying to make a transformation function (let's call it ErrorSieve)
>> that would take a ParDo object as input and modify its underlying DoFn
>> object, basically adding extra logic on top of an underlying process()
>> method.
>>
>> Ideally for me, the example usage would be:
>>
>> ```python
>> p | ErrorSieve(beam.ParDo(MyDoFn())
>>
>> or
>>
>> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
>> ```
>>
>> However, this would require me to butcher the internals of ParDo
>> mechanisms, especially since ParDo's make_fn() method gets called during
>> its transformation. My other thinking was to make it a fair and square DoFn:
>>
>> ```python
>> p | beam.ParDo(ErrorSieve(MyDoFn())
>> ```
>>
>> The only problem with this is that I can't use it with transforms like
>> FlatMap, which is a bit unfortunate.
>>
>> Do you think it's worth investigating how to implement the first
>> approach, or should I just instead settle with the second approach, using
>> only custom DoFns?
>>
>> Thank you.
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>


-- 
Best regards,
Dmitry Demeshchuk.

Re: [Python] Replace ParDo's DoFn after both are constructed

Posted by Sourabh Bajaj <so...@google.com>.
Hi,

Is it possible to create

class ErrorSieve(PTransform):
   def __init__ (dofn):
   def expand():
         return ParDo(modifiedDoFn)

that way your pipeline just looks like p | ErrorSieve(DoFn()) and you don't
expose the ParDo to the user.

Will this work for your usecase?

-Sourabh

On Fri, Jul 21, 2017 at 2:06 PM Dmitry Demeshchuk <dm...@postmates.com>
wrote:

> Hi list,
>
> I'm trying to make a transformation function (let's call it ErrorSieve)
> that would take a ParDo object as input and modify its underlying DoFn
> object, basically adding extra logic on top of an underlying process()
> method.
>
> Ideally for me, the example usage would be:
>
> ```python
> p | ErrorSieve(beam.ParDo(MyDoFn())
>
> or
>
> p | ErrorSieve(beam.FlatMap(lambda x: x + 1))
> ```
>
> However, this would require me to butcher the internals of ParDo
> mechanisms, especially since ParDo's make_fn() method gets called during
> its transformation. My other thinking was to make it a fair and square DoFn:
>
> ```python
> p | beam.ParDo(ErrorSieve(MyDoFn())
> ```
>
> The only problem with this is that I can't use it with transforms like
> FlatMap, which is a bit unfortunate.
>
> Do you think it's worth investigating how to implement the first approach,
> or should I just instead settle with the second approach, using only custom
> DoFns?
>
> Thank you.
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>