You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Sam Bourne <sa...@gmail.com> on 2023/06/20 20:56:25 UTC

Re: Best patterns for a polling transform

+dev to see if anyone has any suggestions.

On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <sa...@gmail.com> wrote:

> Hello beam community!
>
> I’m having trouble coming up with the best pattern to *eagerly* poll. By
> eagerly, I mean that elements should be consumed and yielded as soon as
> possible. There are a handful of experiments that I’ve tried and my latest
> attempt using the timer API seems quite promising, but is operating in a
> way that I find rather unintuitive. My solution was to create a sort of
> recursive timer callback - which I found one example
> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
> of within the beam test code.
>
> I have a few questions:
>
> 1) The below code runs fine with a single worker but with multiple workers
> there are duplicate values. It seems that the callback and snapshot of the
> state is provided to multiple workers and the number of duplications
> increases with the number of workers. Is this due to the values being
> provided to timer.set?
>
> 2) I’m using TimeDomain.WATERMARK here due to it simply not working when
> using REAL_TIME. The docs
> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
> seem to suggest REAL_TIME would be the way to do this, however there
> seems to be no guarantee that a REAL_TIME callback will run. In this
> sample setting the timer to REAL_TIME will simply not ever fire the
> callback. Interestingly, if you call timer.set with any value less than
> the current time.time(), then the callback will run, however it seems to
> fire immediately regardless of the value (and in this sample will actually
> raise an AssertionError
> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
> ).
>
> I’m happy for suggestions!
> -Sam
>
> import randomimport threading
> import apache_beam as beamimport apache_beam.coders as codersimport apache_beam.transforms.combiners as combinersimport apache_beam.transforms.userstate as userstateimport apache_beam.utils.timestamp as timestampfrom apache_beam.options.pipeline_options import PipelineOptions
> class Log(beam.PTransform):
>
>     lock = threading.Lock()
>
>     @classmethod
>     def _log(cls, element, label):
>         with cls.lock:
>             # This just colors the print in terminal
>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>         return element
>
>     def expand(self, pcoll):
>         return pcoll | beam.Map(self._log, self.label)
> class EagerProcess(beam.DoFn):
>
>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>
>     def process(
>             self,
>             element,
>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>     ):
>         _, item = element
>
>         for i in range(item):
>             buffer.add(i)
>
>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>
>     @userstate.on_timer(POLL_TIMER)
>     def flush(
>             self,
>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>     ):
>         cache = buffer.read()
>         buffer.clear()
>
>         requeue = False
>         for item in cache:
>             if random.random() < 0.1:
>                 yield item
>             else:
>                 buffer.add(item)
>                 requeue = True
>
>         if requeue:
>             timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
> def main():
>     options = PipelineOptions.from_dictionary({
>         'direct_num_workers': 3,
>         'direct_running_mode': 'multi_threading',
>     })
>
>     pipe = beam.Pipeline(options=options)
>     (
>         pipe
>         | beam.Create([10])
>         | 'Init' >> Log()
>         | beam.Reify.Timestamp()
>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>         | beam.ParDo(EagerProcess())
>         | 'Complete' >> Log()
>         | beam.transforms.combiners.Count.Globally()
>         | 'Count' >> Log()
>     )
>     result = pipe.run()
>     result.wait_until_finish()
> if __name__ == '__main__':
>     main()
>
>

Re: Best patterns for a polling transform

Posted by Sam Bourne <sa...@gmail.com>.
The streaming support in Python direct runner is currently rather limited

In this experiment I was running a batch pipeline instead of a streaming
one. Are there any known issues using timers with a batch pipeline?

It sounds like we should identify whether this is a problem in the SDK or
in the DirectRunner implementation

I’m trying to understand how things are intended to work to know if I’ve
encountered a bug. For example, when setting a timer, is it reasonable to
expect that timer to fire even if all elements in the pipeline finish
processing? I can see the argument either way depending on the use case.
For example batching RPC vs garbage collection. Could the timer API benefit
from being able to specify whether or not the timer is guaranteed to run?

This polling problem doesn’t seem very unique so I’m surprised this hasn’t
come up before. I’m interested if there are better techniques for doing
this so if anyone has any ideas I’m open to them!

On Thu, Jun 22, 2023 at 7:32 AM Valentyn Tymofieiev via dev <
dev@beam.apache.org> wrote:

> > The below code runs fine with a single worker but with multiple workers
> there are duplicate values.
> > I’m using TimeDomain.WATERMARK here due to it simply not working when
> using REAL_TIME. The docs seem to suggest REAL_TIME would be the way to do
> this, however there seems to be no guarantee that a REAL_TIME callback will
> run.
>
> It seems that you are using Python direct runner for experimentation. The
> streaming support in Python direct runner is currently rather limited:
> https://github.com/apache/beam/issues/21987 , it is possible that direct
> runner doesn't correctly implement the streaming semantics. It sounds like
> we should identify whether this is a problem in the SDK or in the
> DirectRunner implementation, and file issues accordingly. Streaming direct
> runner issues use this umbrella issue:
> https://github.com/apache/beam/issues/21987. I would also experiment with
> FlinkRunner or DataflowRunner. Also the streaming semantics behavior should
> be consistent across SDK, so different behavior between Python and  Java
> SDK would implicate an SDK bug.
>
>
> On Thu, Jun 22, 2023 at 10:00 AM Chad Dombrova <ch...@gmail.com> wrote:
>
>> I’m also interested in the answer to this.  This is essential for reading
>> from many types of data sources.
>>
>>
>> On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <sa...@gmail.com> wrote:
>>
>>> +dev to see if anyone has any suggestions.
>>>
>>> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <sa...@gmail.com> wrote:
>>>
>>>> Hello beam community!
>>>>
>>>> I’m having trouble coming up with the best pattern to *eagerly* poll.
>>>> By eagerly, I mean that elements should be consumed and yielded as soon as
>>>> possible. There are a handful of experiments that I’ve tried and my latest
>>>> attempt using the timer API seems quite promising, but is operating in a
>>>> way that I find rather unintuitive. My solution was to create a sort of
>>>> recursive timer callback - which I found one example
>>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>>>> of within the beam test code.
>>>>
>>>> I have a few questions:
>>>>
>>>> 1) The below code runs fine with a single worker but with multiple
>>>> workers there are duplicate values. It seems that the callback and snapshot
>>>> of the state is provided to multiple workers and the number of duplications
>>>> increases with the number of workers. Is this due to the values being
>>>> provided to timer.set?
>>>>
>>>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working
>>>> when using REAL_TIME. The docs
>>>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>>>> seem to suggest REAL_TIME would be the way to do this, however there
>>>> seems to be no guarantee that a REAL_TIME callback will run. In this
>>>> sample setting the timer to REAL_TIME will simply not ever fire the
>>>> callback. Interestingly, if you call timer.set with any value less
>>>> than the current time.time(), then the callback will run, however it
>>>> seems to fire immediately regardless of the value (and in this sample will
>>>> actually raise an AssertionError
>>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>>>> ).
>>>>
>>>> I’m happy for suggestions!
>>>> -Sam
>>>>
>>>> import randomimport threading
>>>> import apache_beam as beamimport apache_beam.coders as codersimport apache_beam.transforms.combiners as combinersimport apache_beam.transforms.userstate as userstateimport apache_beam.utils.timestamp as timestampfrom apache_beam.options.pipeline_options import PipelineOptions
>>>> class Log(beam.PTransform):
>>>>
>>>>     lock = threading.Lock()
>>>>
>>>>     @classmethod
>>>>     def _log(cls, element, label):
>>>>         with cls.lock:
>>>>             # This just colors the print in terminal
>>>>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>>>>         return element
>>>>
>>>>     def expand(self, pcoll):
>>>>         return pcoll | beam.Map(self._log, self.label)
>>>> class EagerProcess(beam.DoFn):
>>>>
>>>>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>>>>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>>>
>>>>     def process(
>>>>             self,
>>>>             element,
>>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>>     ):
>>>>         _, item = element
>>>>
>>>>         for i in range(item):
>>>>             buffer.add(i)
>>>>
>>>>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>>>
>>>>     @userstate.on_timer(POLL_TIMER)
>>>>     def flush(
>>>>             self,
>>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>>     ):
>>>>         cache = buffer.read()
>>>>         buffer.clear()
>>>>
>>>>         requeue = False
>>>>         for item in cache:
>>>>             if random.random() < 0.1:
>>>>                 yield item
>>>>             else:
>>>>                 buffer.add(item)
>>>>                 requeue = True
>>>>
>>>>         if requeue:
>>>>             timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>>> def main():
>>>>     options = PipelineOptions.from_dictionary({
>>>>         'direct_num_workers': 3,
>>>>         'direct_running_mode': 'multi_threading',
>>>>     })
>>>>
>>>>     pipe = beam.Pipeline(options=options)
>>>>     (
>>>>         pipe
>>>>         | beam.Create([10])
>>>>         | 'Init' >> Log()
>>>>         | beam.Reify.Timestamp()
>>>>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>>>>         | beam.ParDo(EagerProcess())
>>>>         | 'Complete' >> Log()
>>>>         | beam.transforms.combiners.Count.Globally()
>>>>         | 'Count' >> Log()
>>>>     )
>>>>     result = pipe.run()
>>>>     result.wait_until_finish()
>>>> if __name__ == '__main__':
>>>>     main()
>>>>
>>>>

Re: Best patterns for a polling transform

Posted by Sam Bourne <sa...@gmail.com>.
The streaming support in Python direct runner is currently rather limited

In this experiment I was running a batch pipeline instead of a streaming
one. Are there any known issues using timers with a batch pipeline?

It sounds like we should identify whether this is a problem in the SDK or
in the DirectRunner implementation

I’m trying to understand how things are intended to work to know if I’ve
encountered a bug. For example, when setting a timer, is it reasonable to
expect that timer to fire even if all elements in the pipeline finish
processing? I can see the argument either way depending on the use case.
For example batching RPC vs garbage collection. Could the timer API benefit
from being able to specify whether or not the timer is guaranteed to run?

This polling problem doesn’t seem very unique so I’m surprised this hasn’t
come up before. I’m interested if there are better techniques for doing
this so if anyone has any ideas I’m open to them!

On Thu, Jun 22, 2023 at 7:32 AM Valentyn Tymofieiev via dev <
dev@beam.apache.org> wrote:

> > The below code runs fine with a single worker but with multiple workers
> there are duplicate values.
> > I’m using TimeDomain.WATERMARK here due to it simply not working when
> using REAL_TIME. The docs seem to suggest REAL_TIME would be the way to do
> this, however there seems to be no guarantee that a REAL_TIME callback will
> run.
>
> It seems that you are using Python direct runner for experimentation. The
> streaming support in Python direct runner is currently rather limited:
> https://github.com/apache/beam/issues/21987 , it is possible that direct
> runner doesn't correctly implement the streaming semantics. It sounds like
> we should identify whether this is a problem in the SDK or in the
> DirectRunner implementation, and file issues accordingly. Streaming direct
> runner issues use this umbrella issue:
> https://github.com/apache/beam/issues/21987. I would also experiment with
> FlinkRunner or DataflowRunner. Also the streaming semantics behavior should
> be consistent across SDK, so different behavior between Python and  Java
> SDK would implicate an SDK bug.
>
>
> On Thu, Jun 22, 2023 at 10:00 AM Chad Dombrova <ch...@gmail.com> wrote:
>
>> I’m also interested in the answer to this.  This is essential for reading
>> from many types of data sources.
>>
>>
>> On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <sa...@gmail.com> wrote:
>>
>>> +dev to see if anyone has any suggestions.
>>>
>>> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <sa...@gmail.com> wrote:
>>>
>>>> Hello beam community!
>>>>
>>>> I’m having trouble coming up with the best pattern to *eagerly* poll.
>>>> By eagerly, I mean that elements should be consumed and yielded as soon as
>>>> possible. There are a handful of experiments that I’ve tried and my latest
>>>> attempt using the timer API seems quite promising, but is operating in a
>>>> way that I find rather unintuitive. My solution was to create a sort of
>>>> recursive timer callback - which I found one example
>>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>>>> of within the beam test code.
>>>>
>>>> I have a few questions:
>>>>
>>>> 1) The below code runs fine with a single worker but with multiple
>>>> workers there are duplicate values. It seems that the callback and snapshot
>>>> of the state is provided to multiple workers and the number of duplications
>>>> increases with the number of workers. Is this due to the values being
>>>> provided to timer.set?
>>>>
>>>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working
>>>> when using REAL_TIME. The docs
>>>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>>>> seem to suggest REAL_TIME would be the way to do this, however there
>>>> seems to be no guarantee that a REAL_TIME callback will run. In this
>>>> sample setting the timer to REAL_TIME will simply not ever fire the
>>>> callback. Interestingly, if you call timer.set with any value less
>>>> than the current time.time(), then the callback will run, however it
>>>> seems to fire immediately regardless of the value (and in this sample will
>>>> actually raise an AssertionError
>>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>>>> ).
>>>>
>>>> I’m happy for suggestions!
>>>> -Sam
>>>>
>>>> import randomimport threading
>>>> import apache_beam as beamimport apache_beam.coders as codersimport apache_beam.transforms.combiners as combinersimport apache_beam.transforms.userstate as userstateimport apache_beam.utils.timestamp as timestampfrom apache_beam.options.pipeline_options import PipelineOptions
>>>> class Log(beam.PTransform):
>>>>
>>>>     lock = threading.Lock()
>>>>
>>>>     @classmethod
>>>>     def _log(cls, element, label):
>>>>         with cls.lock:
>>>>             # This just colors the print in terminal
>>>>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>>>>         return element
>>>>
>>>>     def expand(self, pcoll):
>>>>         return pcoll | beam.Map(self._log, self.label)
>>>> class EagerProcess(beam.DoFn):
>>>>
>>>>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>>>>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>>>
>>>>     def process(
>>>>             self,
>>>>             element,
>>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>>     ):
>>>>         _, item = element
>>>>
>>>>         for i in range(item):
>>>>             buffer.add(i)
>>>>
>>>>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>>>
>>>>     @userstate.on_timer(POLL_TIMER)
>>>>     def flush(
>>>>             self,
>>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>>     ):
>>>>         cache = buffer.read()
>>>>         buffer.clear()
>>>>
>>>>         requeue = False
>>>>         for item in cache:
>>>>             if random.random() < 0.1:
>>>>                 yield item
>>>>             else:
>>>>                 buffer.add(item)
>>>>                 requeue = True
>>>>
>>>>         if requeue:
>>>>             timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>>> def main():
>>>>     options = PipelineOptions.from_dictionary({
>>>>         'direct_num_workers': 3,
>>>>         'direct_running_mode': 'multi_threading',
>>>>     })
>>>>
>>>>     pipe = beam.Pipeline(options=options)
>>>>     (
>>>>         pipe
>>>>         | beam.Create([10])
>>>>         | 'Init' >> Log()
>>>>         | beam.Reify.Timestamp()
>>>>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>>>>         | beam.ParDo(EagerProcess())
>>>>         | 'Complete' >> Log()
>>>>         | beam.transforms.combiners.Count.Globally()
>>>>         | 'Count' >> Log()
>>>>     )
>>>>     result = pipe.run()
>>>>     result.wait_until_finish()
>>>> if __name__ == '__main__':
>>>>     main()
>>>>
>>>>

Re: Best patterns for a polling transform

Posted by Valentyn Tymofieiev via user <us...@beam.apache.org>.
> The below code runs fine with a single worker but with multiple workers
there are duplicate values.
> I’m using TimeDomain.WATERMARK here due to it simply not working when
using REAL_TIME. The docs seem to suggest REAL_TIME would be the way to do
this, however there seems to be no guarantee that a REAL_TIME callback will
run.

It seems that you are using Python direct runner for experimentation. The
streaming support in Python direct runner is currently rather limited:
https://github.com/apache/beam/issues/21987 , it is possible that direct
runner doesn't correctly implement the streaming semantics. It sounds like
we should identify whether this is a problem in the SDK or in the
DirectRunner implementation, and file issues accordingly. Streaming direct
runner issues use this umbrella issue:
https://github.com/apache/beam/issues/21987. I would also experiment with
FlinkRunner or DataflowRunner. Also the streaming semantics behavior should
be consistent across SDK, so different behavior between Python and  Java
SDK would implicate an SDK bug.


On Thu, Jun 22, 2023 at 10:00 AM Chad Dombrova <ch...@gmail.com> wrote:

> I’m also interested in the answer to this.  This is essential for reading
> from many types of data sources.
>
>
> On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <sa...@gmail.com> wrote:
>
>> +dev to see if anyone has any suggestions.
>>
>> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <sa...@gmail.com> wrote:
>>
>>> Hello beam community!
>>>
>>> I’m having trouble coming up with the best pattern to *eagerly* poll.
>>> By eagerly, I mean that elements should be consumed and yielded as soon as
>>> possible. There are a handful of experiments that I’ve tried and my latest
>>> attempt using the timer API seems quite promising, but is operating in a
>>> way that I find rather unintuitive. My solution was to create a sort of
>>> recursive timer callback - which I found one example
>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>>> of within the beam test code.
>>>
>>> I have a few questions:
>>>
>>> 1) The below code runs fine with a single worker but with multiple
>>> workers there are duplicate values. It seems that the callback and snapshot
>>> of the state is provided to multiple workers and the number of duplications
>>> increases with the number of workers. Is this due to the values being
>>> provided to timer.set?
>>>
>>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working
>>> when using REAL_TIME. The docs
>>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>>> seem to suggest REAL_TIME would be the way to do this, however there
>>> seems to be no guarantee that a REAL_TIME callback will run. In this
>>> sample setting the timer to REAL_TIME will simply not ever fire the
>>> callback. Interestingly, if you call timer.set with any value less than
>>> the current time.time(), then the callback will run, however it seems
>>> to fire immediately regardless of the value (and in this sample will
>>> actually raise an AssertionError
>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>>> ).
>>>
>>> I’m happy for suggestions!
>>> -Sam
>>>
>>> import randomimport threading
>>> import apache_beam as beamimport apache_beam.coders as codersimport apache_beam.transforms.combiners as combinersimport apache_beam.transforms.userstate as userstateimport apache_beam.utils.timestamp as timestampfrom apache_beam.options.pipeline_options import PipelineOptions
>>> class Log(beam.PTransform):
>>>
>>>     lock = threading.Lock()
>>>
>>>     @classmethod
>>>     def _log(cls, element, label):
>>>         with cls.lock:
>>>             # This just colors the print in terminal
>>>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>>>         return element
>>>
>>>     def expand(self, pcoll):
>>>         return pcoll | beam.Map(self._log, self.label)
>>> class EagerProcess(beam.DoFn):
>>>
>>>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>>>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>>
>>>     def process(
>>>             self,
>>>             element,
>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>     ):
>>>         _, item = element
>>>
>>>         for i in range(item):
>>>             buffer.add(i)
>>>
>>>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>>
>>>     @userstate.on_timer(POLL_TIMER)
>>>     def flush(
>>>             self,
>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>     ):
>>>         cache = buffer.read()
>>>         buffer.clear()
>>>
>>>         requeue = False
>>>         for item in cache:
>>>             if random.random() < 0.1:
>>>                 yield item
>>>             else:
>>>                 buffer.add(item)
>>>                 requeue = True
>>>
>>>         if requeue:
>>>             timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>> def main():
>>>     options = PipelineOptions.from_dictionary({
>>>         'direct_num_workers': 3,
>>>         'direct_running_mode': 'multi_threading',
>>>     })
>>>
>>>     pipe = beam.Pipeline(options=options)
>>>     (
>>>         pipe
>>>         | beam.Create([10])
>>>         | 'Init' >> Log()
>>>         | beam.Reify.Timestamp()
>>>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>>>         | beam.ParDo(EagerProcess())
>>>         | 'Complete' >> Log()
>>>         | beam.transforms.combiners.Count.Globally()
>>>         | 'Count' >> Log()
>>>     )
>>>     result = pipe.run()
>>>     result.wait_until_finish()
>>> if __name__ == '__main__':
>>>     main()
>>>
>>>

Re: Best patterns for a polling transform

Posted by Valentyn Tymofieiev via dev <de...@beam.apache.org>.
> The below code runs fine with a single worker but with multiple workers
there are duplicate values.
> I’m using TimeDomain.WATERMARK here due to it simply not working when
using REAL_TIME. The docs seem to suggest REAL_TIME would be the way to do
this, however there seems to be no guarantee that a REAL_TIME callback will
run.

It seems that you are using Python direct runner for experimentation. The
streaming support in Python direct runner is currently rather limited:
https://github.com/apache/beam/issues/21987 , it is possible that direct
runner doesn't correctly implement the streaming semantics. It sounds like
we should identify whether this is a problem in the SDK or in the
DirectRunner implementation, and file issues accordingly. Streaming direct
runner issues use this umbrella issue:
https://github.com/apache/beam/issues/21987. I would also experiment with
FlinkRunner or DataflowRunner. Also the streaming semantics behavior should
be consistent across SDK, so different behavior between Python and  Java
SDK would implicate an SDK bug.


On Thu, Jun 22, 2023 at 10:00 AM Chad Dombrova <ch...@gmail.com> wrote:

> I’m also interested in the answer to this.  This is essential for reading
> from many types of data sources.
>
>
> On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <sa...@gmail.com> wrote:
>
>> +dev to see if anyone has any suggestions.
>>
>> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <sa...@gmail.com> wrote:
>>
>>> Hello beam community!
>>>
>>> I’m having trouble coming up with the best pattern to *eagerly* poll.
>>> By eagerly, I mean that elements should be consumed and yielded as soon as
>>> possible. There are a handful of experiments that I’ve tried and my latest
>>> attempt using the timer API seems quite promising, but is operating in a
>>> way that I find rather unintuitive. My solution was to create a sort of
>>> recursive timer callback - which I found one example
>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>>> of within the beam test code.
>>>
>>> I have a few questions:
>>>
>>> 1) The below code runs fine with a single worker but with multiple
>>> workers there are duplicate values. It seems that the callback and snapshot
>>> of the state is provided to multiple workers and the number of duplications
>>> increases with the number of workers. Is this due to the values being
>>> provided to timer.set?
>>>
>>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working
>>> when using REAL_TIME. The docs
>>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>>> seem to suggest REAL_TIME would be the way to do this, however there
>>> seems to be no guarantee that a REAL_TIME callback will run. In this
>>> sample setting the timer to REAL_TIME will simply not ever fire the
>>> callback. Interestingly, if you call timer.set with any value less than
>>> the current time.time(), then the callback will run, however it seems
>>> to fire immediately regardless of the value (and in this sample will
>>> actually raise an AssertionError
>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>>> ).
>>>
>>> I’m happy for suggestions!
>>> -Sam
>>>
>>> import randomimport threading
>>> import apache_beam as beamimport apache_beam.coders as codersimport apache_beam.transforms.combiners as combinersimport apache_beam.transforms.userstate as userstateimport apache_beam.utils.timestamp as timestampfrom apache_beam.options.pipeline_options import PipelineOptions
>>> class Log(beam.PTransform):
>>>
>>>     lock = threading.Lock()
>>>
>>>     @classmethod
>>>     def _log(cls, element, label):
>>>         with cls.lock:
>>>             # This just colors the print in terminal
>>>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>>>         return element
>>>
>>>     def expand(self, pcoll):
>>>         return pcoll | beam.Map(self._log, self.label)
>>> class EagerProcess(beam.DoFn):
>>>
>>>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>>>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>>
>>>     def process(
>>>             self,
>>>             element,
>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>     ):
>>>         _, item = element
>>>
>>>         for i in range(item):
>>>             buffer.add(i)
>>>
>>>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>>
>>>     @userstate.on_timer(POLL_TIMER)
>>>     def flush(
>>>             self,
>>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>>     ):
>>>         cache = buffer.read()
>>>         buffer.clear()
>>>
>>>         requeue = False
>>>         for item in cache:
>>>             if random.random() < 0.1:
>>>                 yield item
>>>             else:
>>>                 buffer.add(item)
>>>                 requeue = True
>>>
>>>         if requeue:
>>>             timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>> def main():
>>>     options = PipelineOptions.from_dictionary({
>>>         'direct_num_workers': 3,
>>>         'direct_running_mode': 'multi_threading',
>>>     })
>>>
>>>     pipe = beam.Pipeline(options=options)
>>>     (
>>>         pipe
>>>         | beam.Create([10])
>>>         | 'Init' >> Log()
>>>         | beam.Reify.Timestamp()
>>>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>>>         | beam.ParDo(EagerProcess())
>>>         | 'Complete' >> Log()
>>>         | beam.transforms.combiners.Count.Globally()
>>>         | 'Count' >> Log()
>>>     )
>>>     result = pipe.run()
>>>     result.wait_until_finish()
>>> if __name__ == '__main__':
>>>     main()
>>>
>>>

Re: Best patterns for a polling transform

Posted by Chad Dombrova <ch...@gmail.com>.
I’m also interested in the answer to this.  This is essential for reading
from many types of data sources.


On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <sa...@gmail.com> wrote:

> +dev to see if anyone has any suggestions.
>
> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <sa...@gmail.com> wrote:
>
>> Hello beam community!
>>
>> I’m having trouble coming up with the best pattern to *eagerly* poll. By
>> eagerly, I mean that elements should be consumed and yielded as soon as
>> possible. There are a handful of experiments that I’ve tried and my latest
>> attempt using the timer API seems quite promising, but is operating in a
>> way that I find rather unintuitive. My solution was to create a sort of
>> recursive timer callback - which I found one example
>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>> of within the beam test code.
>>
>> I have a few questions:
>>
>> 1) The below code runs fine with a single worker but with multiple
>> workers there are duplicate values. It seems that the callback and snapshot
>> of the state is provided to multiple workers and the number of duplications
>> increases with the number of workers. Is this due to the values being
>> provided to timer.set?
>>
>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working when
>> using REAL_TIME. The docs
>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>> seem to suggest REAL_TIME would be the way to do this, however there
>> seems to be no guarantee that a REAL_TIME callback will run. In this
>> sample setting the timer to REAL_TIME will simply not ever fire the
>> callback. Interestingly, if you call timer.set with any value less than
>> the current time.time(), then the callback will run, however it seems to
>> fire immediately regardless of the value (and in this sample will actually
>> raise an AssertionError
>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>> ).
>>
>> I’m happy for suggestions!
>> -Sam
>>
>> import randomimport threading
>> import apache_beam as beamimport apache_beam.coders as codersimport apache_beam.transforms.combiners as combinersimport apache_beam.transforms.userstate as userstateimport apache_beam.utils.timestamp as timestampfrom apache_beam.options.pipeline_options import PipelineOptions
>> class Log(beam.PTransform):
>>
>>     lock = threading.Lock()
>>
>>     @classmethod
>>     def _log(cls, element, label):
>>         with cls.lock:
>>             # This just colors the print in terminal
>>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>>         return element
>>
>>     def expand(self, pcoll):
>>         return pcoll | beam.Map(self._log, self.label)
>> class EagerProcess(beam.DoFn):
>>
>>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>
>>     def process(
>>             self,
>>             element,
>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>     ):
>>         _, item = element
>>
>>         for i in range(item):
>>             buffer.add(i)
>>
>>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>
>>     @userstate.on_timer(POLL_TIMER)
>>     def flush(
>>             self,
>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>     ):
>>         cache = buffer.read()
>>         buffer.clear()
>>
>>         requeue = False
>>         for item in cache:
>>             if random.random() < 0.1:
>>                 yield item
>>             else:
>>                 buffer.add(item)
>>                 requeue = True
>>
>>         if requeue:
>>             timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>> def main():
>>     options = PipelineOptions.from_dictionary({
>>         'direct_num_workers': 3,
>>         'direct_running_mode': 'multi_threading',
>>     })
>>
>>     pipe = beam.Pipeline(options=options)
>>     (
>>         pipe
>>         | beam.Create([10])
>>         | 'Init' >> Log()
>>         | beam.Reify.Timestamp()
>>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>>         | beam.ParDo(EagerProcess())
>>         | 'Complete' >> Log()
>>         | beam.transforms.combiners.Count.Globally()
>>         | 'Count' >> Log()
>>     )
>>     result = pipe.run()
>>     result.wait_until_finish()
>> if __name__ == '__main__':
>>     main()
>>
>>

Re: Best patterns for a polling transform

Posted by Chad Dombrova <ch...@gmail.com>.
I’m also interested in the answer to this.  This is essential for reading
from many types of data sources.


On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne <sa...@gmail.com> wrote:

> +dev to see if anyone has any suggestions.
>
> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne <sa...@gmail.com> wrote:
>
>> Hello beam community!
>>
>> I’m having trouble coming up with the best pattern to *eagerly* poll. By
>> eagerly, I mean that elements should be consumed and yielded as soon as
>> possible. There are a handful of experiments that I’ve tried and my latest
>> attempt using the timer API seems quite promising, but is operating in a
>> way that I find rather unintuitive. My solution was to create a sort of
>> recursive timer callback - which I found one example
>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>> of within the beam test code.
>>
>> I have a few questions:
>>
>> 1) The below code runs fine with a single worker but with multiple
>> workers there are duplicate values. It seems that the callback and snapshot
>> of the state is provided to multiple workers and the number of duplications
>> increases with the number of workers. Is this due to the values being
>> provided to timer.set?
>>
>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working when
>> using REAL_TIME. The docs
>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>> seem to suggest REAL_TIME would be the way to do this, however there
>> seems to be no guarantee that a REAL_TIME callback will run. In this
>> sample setting the timer to REAL_TIME will simply not ever fire the
>> callback. Interestingly, if you call timer.set with any value less than
>> the current time.time(), then the callback will run, however it seems to
>> fire immediately regardless of the value (and in this sample will actually
>> raise an AssertionError
>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>> ).
>>
>> I’m happy for suggestions!
>> -Sam
>>
>> import randomimport threading
>> import apache_beam as beamimport apache_beam.coders as codersimport apache_beam.transforms.combiners as combinersimport apache_beam.transforms.userstate as userstateimport apache_beam.utils.timestamp as timestampfrom apache_beam.options.pipeline_options import PipelineOptions
>> class Log(beam.PTransform):
>>
>>     lock = threading.Lock()
>>
>>     @classmethod
>>     def _log(cls, element, label):
>>         with cls.lock:
>>             # This just colors the print in terminal
>>             print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
>>         return element
>>
>>     def expand(self, pcoll):
>>         return pcoll | beam.Map(self._log, self.label)
>> class EagerProcess(beam.DoFn):
>>
>>     BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
>>     POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>>
>>     def process(
>>             self,
>>             element,
>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>     ):
>>         _, item = element
>>
>>         for i in range(item):
>>             buffer.add(i)
>>
>>         timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>>
>>     @userstate.on_timer(POLL_TIMER)
>>     def flush(
>>             self,
>>             buffer=beam.DoFn.StateParam(BUFFER_STATE),
>>             timer=beam.DoFn.TimerParam(POLL_TIMER),
>>     ):
>>         cache = buffer.read()
>>         buffer.clear()
>>
>>         requeue = False
>>         for item in cache:
>>             if random.random() < 0.1:
>>                 yield item
>>             else:
>>                 buffer.add(item)
>>                 requeue = True
>>
>>         if requeue:
>>             timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>> def main():
>>     options = PipelineOptions.from_dictionary({
>>         'direct_num_workers': 3,
>>         'direct_running_mode': 'multi_threading',
>>     })
>>
>>     pipe = beam.Pipeline(options=options)
>>     (
>>         pipe
>>         | beam.Create([10])
>>         | 'Init' >> Log()
>>         | beam.Reify.Timestamp()
>>         | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
>>         | beam.ParDo(EagerProcess())
>>         | 'Complete' >> Log()
>>         | beam.transforms.combiners.Count.Globally()
>>         | 'Count' >> Log()
>>     )
>>     result = pipe.run()
>>     result.wait_until_finish()
>> if __name__ == '__main__':
>>     main()
>>
>>