You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ismaël Mejía <ie...@gmail.com> on 2020/02/03 08:50:08 UTC

Re: Dynamic timers now supported!

I had a discussion with Rehman last week and we discovered that the
TimersMap
related tests were not running for all runners because they were not tagged
as
part of the ValidatesRunner category. I opened a PR [1] to enable this, so
please someone help me with the review/merge.

I took a look just for curiosity and discovered that they are only passing
for
Direct runner and for the classic Flink runner in batch mode. They are not
passing for Dataflow [2][3] and for the Portable Flink runner, so probably
worth
to reopen the issue to investigate/fix.

[1] https://github.com/apache/beam/pull/10747
[2]
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
[3]
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/


On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:

> Yes. For now we exclude the flink runner, but fixing this should be fairly
> trivial.
>
> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org> wrote:
>
>> The Flink Runner was allowing to set a timer multiple times before we
>> made it comply with the Beam semantics of overwriting past invocations.
>> I wouldn't be surprised if the Spark Runner never addressed this. Flink
>> and Spark itself allow for a timer to be set to multiple times. In order
>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>> map which sits outside of its builtin TimerService.
>>
>> As far as I can see, multiple timer families are currently not supported
>> in the Flink Runner due to the map not taking the family name into
>> account. This can be easily fixed though.
>>
>> -Max
>>
>> On 24.01.20 21:31, Reuven Lax wrote:
>> > The new timer family is in the portability protos. I think
>> TimerReceiver
>> > needs to be updated to set it though (I think a 1-line change).
>> >
>> > The TimerInternals class that runners implement today already handles
>> > dynamic timers, so most of the work was in the Beam SDK  to provide an
>> > API that allows users to access this feature.
>> >
>> > The main work needed in the runner was to take in account the timer
>> > family. Beam semantics say that if a timer is set twice with the same
>> > id, then the second timer overwrites the first.  Several runners
>> > therefore had maps from timer id -> timer. However since the
>> > timer family scopes the timers, we now allow two timers with the same
>> id
>> > as long as the timer families are different. Runners had to be updated
>> > to include the timer family id in the map keys.
>> >
>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>> > ValidatesRunner, even though the Spark runner wasn't updated! I wonder
>> > if this means that the Spark runner was incorrectly implementing the
>> > Beam semantics before, and setTimer was not overwriting timers with the
>> > same id?
>> >
>> > Reuven
>> >
>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>> > <ma...@gmail.com>> wrote:
>> >
>> >     This looks great, thanks for the contribution Rehman!
>> >
>> >     I have some questions (note I have not looked at the code at all).
>> >
>> >     - Is this working for both portable and non portable runners?
>> >     - What do other runners need to implement to support this (e.g.
>> Spark)?
>> >
>> >     Maybe worth to add this to the website Compatibility Matrix.
>> >
>> >     Regards,
>> >     Ismaël
>> >
>> >
>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>> >     <rehman.muradali@venturedive.com
>> >     <ma...@venturedive.com>> wrote:
>> >
>> >         Thank you Reuven for the guidance throughout the development
>> >         process. I am delighted to contribute my two cents to the Beam
>> >         project.
>> >
>> >         Looking forward to more active contributions.
>> >
>> >         *
>> >         *
>> >
>> >         *Thanks & Regards____*
>> >
>> >
>> >
>> >         *Rehman Murad Ali*
>> >         Software Engineer
>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>> <tel:+92%20345%202076766>
>> >         Skype: rehman.muradali
>> >
>> >
>> >
>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <relax@google.com
>> >         <ma...@google.com>> wrote:
>> >
>> >             Thanks to a lot of hard work by Rehman, Beam now supports
>> >             dynamic timers. As a reminder, this was discussed on the dev
>> >             list some time back.
>> >
>> >             As background, previously one had to statically declare all
>> >             timers in your code. So if you wanted to have two timers,
>> >             you needed to create two timer variables and two callbacks -
>> >             one for each timer. A number of users kept hitting stumbling
>> >             blocks where they needed a dynamic set of timers (often
>> >             based on the element), which was not supported in Beam. The
>> >             workarounds were quite ugly and complicated.
>> >
>> >             The new support allows declaring a TimerMap, which is a map
>> >             of timers. Each TimerMap is scoped by a family name, so you
>> >             can create multiple TimerMaps each with its own callback.
>> >             The use looks as follows:
>> >
>> >             class MyDoFn extends DoFn<...> {
>> >                 @TimerFamily("timers")
>> >                 private final TimerSpec timerMap =
>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>> >
>> >                 @ProcessElement
>> >                  public void process(@TimerFamily("timers") TimerMap
>> >             timers, @Element Type e) {
>> >                     timers.set("mainTimer", timestamp);
>> >                     timers.set("actionType" + e.getActionType(),
>> timestamp);
>> >                 }
>> >
>> >                @OnTimerFamily .
>> >                public void onTimer(@TimerId String timerId) {
>> >                   System.out.println("Timer fired. id: " + timerId);
>> >                }
>> >             }
>> >
>> >             This currently works for the Flink and the Dataflow runners.
>> >
>> >             Thank you Rehman for getting this done! Beam users will find
>> >             it very valuable.
>> >
>> >             Reuven
>> >
>>
>

Re: Dynamic timers now supported!

Posted by Reuven Lax <re...@google.com>.
Thanks for figuring this out! I didn't know about the UsesUnboundedPCollections
category.

On Fri, Feb 14, 2020 at 12:57 PM Ismaël Mejía <ie...@gmail.com> wrote:

> Exact, since the new tests use Unbounded PCollections we have to add the
> UsesUnboundedPCollections category.
> Also the Flink runner was not excluding this category for the batch
> (bounded) tests.
> I opened this one to fix it, PTAL Reuven
> https://github.com/apache/beam/pull/10871
>
> On Fri, Feb 14, 2020 at 8:52 PM Reuven Lax <re...@google.com> wrote:
>
>> This is running as part of the validatesRunnerBatch test, but it is
>> executing a streaming test. Maybe that's why it's failing?
>>
>> On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Ismael,
>>>
>>> As part of that fix I added some new tests to make sure to run these
>>> tests on both batch and streaming runners, as I realized that the test was
>>> running only on batch runners before.  I did this by explicitly setting the
>>> isBounded attribute on the output of the Create transform. Somehow these
>>> new tests I added are making the Flink runner unhappy.
>>>
>>> I'm not sure why explicitly setting the PCollection to be unbounded is
>>> breaking on the Flink runner. We can try and exclude the flink runner from
>>> these tests for now, but maybe Max has an idea.
>>>
>>> Also Ismael, what makes this a batch mode test? When I look at the
>>> failing stack trace, the failure is in FlinkStreamingPipelineTranslator.
>>>
>>> Reuven
>>>
>>> On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> Apparently the fix of Dynamic Timers for Dataflow broke the
>>>> ValidatesRunner tests for Flink in batch mode that were passing before.
>>>> Can you please take a look Reuven or Rehman.
>>>> Tests are failing since the exact commit for the fix:
>>>> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
>>>> For details on the exception:
>>>>
>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>>>>
>>>>
>>>> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ie...@gmail.com>
>>>> wrote:
>>>>
>>>>> Great to know you get it working on Dataflow easily Reuven. As a new
>>>>> feature it
>>>>> looks great!
>>>>>
>>>>> Agree with Kenn maybe worth to open a new thread to discuss the
>>>>> changes still
>>>>> needed to support this in portable runners.
>>>>>
>>>>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <ke...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I think the (lack of) portability bit may have been buried in this
>>>>>> thread. Maybe a new thread about the design for that?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> FYI, this is now fixed for Dataflow. I also added better rejection
>>>>>>> so that runners that don't support this feature will reject the pipeline.
>>>>>>>
>>>>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>>>>
>>>>>>>> A larger question is how to support this in the portability layer.
>>>>>>>> Right now portability assumes that each timer id corresponds to a logical
>>>>>>>> input PCollection, but that assumption no longer works as we now support a
>>>>>>>> dynamic set of timers, each with their own id. We could instead model each
>>>>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>>>>> dynamically get the timer id in order to invoke it, and today it statically
>>>>>>>> reads the timer id from the PCollection name.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The
>>>>>>>>> tests indeed never ran on any runner except for the DirectRunner, which is
>>>>>>>>> something I should've noticed in the code review.
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I had a discussion with Rehman last week and we discovered that
>>>>>>>>>> the TimersMap
>>>>>>>>>> related tests were not running for all runners because they were
>>>>>>>>>> not tagged as
>>>>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>>>>> this, so
>>>>>>>>>> please someone help me with the review/merge.
>>>>>>>>>>
>>>>>>>>>> I took a look just for curiosity and discovered that they are
>>>>>>>>>> only passing for
>>>>>>>>>> Direct runner and for the classic Flink runner in batch mode.
>>>>>>>>>> They are not
>>>>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>>>>> probably worth
>>>>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>>>>
>>>>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>>>>> [2]
>>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>>>>> [3]
>>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should
>>>>>>>>>>> be fairly trivial.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <
>>>>>>>>>>> mxm@apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> The Flink Runner was allowing to set a timer multiple times
>>>>>>>>>>>> before we
>>>>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>>>>> invocations.
>>>>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed
>>>>>>>>>>>> this. Flink
>>>>>>>>>>>> and Spark itself allow for a timer to be set to multiple times.
>>>>>>>>>>>> In order
>>>>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>>>>> checkpointed
>>>>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>>>>
>>>>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>>>>> supported
>>>>>>>>>>>> in the Flink Runner due to the map not taking the family name
>>>>>>>>>>>> into
>>>>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>>>>
>>>>>>>>>>>> -Max
>>>>>>>>>>>>
>>>>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>>>>> TimerReceiver
>>>>>>>>>>>> > needs to be updated to set it though (I think a 1-line
>>>>>>>>>>>> change).
>>>>>>>>>>>> >
>>>>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>>>>> handles
>>>>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>>>>> provide an
>>>>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>>>>> >
>>>>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>>>>> timer
>>>>>>>>>>>> > family. Beam semantics say that if a timer is set twice with
>>>>>>>>>>>> the same
>>>>>>>>>>>> > id, then the second timer overwrites the first.  Several
>>>>>>>>>>>> runners
>>>>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>>>>> > timer family scopes the timers, we now allow two timers with
>>>>>>>>>>>> the same id
>>>>>>>>>>>> > as long as the timer families are different. Runners had to
>>>>>>>>>>>> be updated
>>>>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated!
>>>>>>>>>>>> I wonder
>>>>>>>>>>>> > if this means that the Spark runner was incorrectly
>>>>>>>>>>>> implementing the
>>>>>>>>>>>> > Beam semantics before, and setTimer was not overwriting
>>>>>>>>>>>> timers with the
>>>>>>>>>>>> > same id?
>>>>>>>>>>>> >
>>>>>>>>>>>> > Reuven
>>>>>>>>>>>> >
>>>>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <
>>>>>>>>>>>> iemejia@gmail.com
>>>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>>>>> >
>>>>>>>>>>>> >     I have some questions (note I have not looked at the code
>>>>>>>>>>>> at all).
>>>>>>>>>>>> >
>>>>>>>>>>>> >     - Is this working for both portable and non portable
>>>>>>>>>>>> runners?
>>>>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>>>>> (e.g. Spark)?
>>>>>>>>>>>> >
>>>>>>>>>>>> >     Maybe worth to add this to the website Compatibility
>>>>>>>>>>>> Matrix.
>>>>>>>>>>>> >
>>>>>>>>>>>> >     Regards,
>>>>>>>>>>>> >     Ismaël
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>>>>> >     <rehman.muradali@venturedive.com
>>>>>>>>>>>> >     <ma...@venturedive.com>> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>>>>> development
>>>>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>>>>> the Beam
>>>>>>>>>>>> >         project.
>>>>>>>>>>>> >
>>>>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>>>>> >
>>>>>>>>>>>> >         *
>>>>>>>>>>>> >         *
>>>>>>>>>>>> >
>>>>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>>>>> >         Software Engineer
>>>>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>>>>> relax@google.com
>>>>>>>>>>>> >         <ma...@google.com>> wrote:
>>>>>>>>>>>> >
>>>>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>>>>> supports
>>>>>>>>>>>> >             dynamic timers. As a reminder, this was discussed
>>>>>>>>>>>> on the dev
>>>>>>>>>>>> >             list some time back.
>>>>>>>>>>>> >
>>>>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>>>>> declare all
>>>>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>>>>> timers,
>>>>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>>>>> callbacks -
>>>>>>>>>>>> >             one for each timer. A number of users kept
>>>>>>>>>>>> hitting stumbling
>>>>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>>>>> (often
>>>>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>>>>> Beam. The
>>>>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>>>>> >
>>>>>>>>>>>> >             The new support allows declaring a TimerMap,
>>>>>>>>>>>> which is a map
>>>>>>>>>>>> >             of timers. Each TimerMap is scoped by a family
>>>>>>>>>>>> name, so you
>>>>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>>>>> callback.
>>>>>>>>>>>> >             The use looks as follows:
>>>>>>>>>>>> >
>>>>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>>>>> >
>>>>>>>>>>>> >                 @ProcessElement
>>>>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>>>>> TimerMap
>>>>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>>>>> >                     timers.set("actionType" +
>>>>>>>>>>>> e.getActionType(), timestamp);
>>>>>>>>>>>> >                 }
>>>>>>>>>>>> >
>>>>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>>>>> timerId);
>>>>>>>>>>>> >                }
>>>>>>>>>>>> >             }
>>>>>>>>>>>> >
>>>>>>>>>>>> >             This currently works for the Flink and the
>>>>>>>>>>>> Dataflow runners.
>>>>>>>>>>>> >
>>>>>>>>>>>> >             Thank you Rehman for getting this done! Beam
>>>>>>>>>>>> users will find
>>>>>>>>>>>> >             it very valuable.
>>>>>>>>>>>> >
>>>>>>>>>>>> >             Reuven
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>

Re: Dynamic timers now supported!

Posted by Ismaël Mejía <ie...@gmail.com>.
Exact, since the new tests use Unbounded PCollections we have to add the
UsesUnboundedPCollections category.
Also the Flink runner was not excluding this category for the batch
(bounded) tests.
I opened this one to fix it, PTAL Reuven
https://github.com/apache/beam/pull/10871

On Fri, Feb 14, 2020 at 8:52 PM Reuven Lax <re...@google.com> wrote:

> This is running as part of the validatesRunnerBatch test, but it is
> executing a streaming test. Maybe that's why it's failing?
>
> On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <re...@google.com> wrote:
>
>> Ismael,
>>
>> As part of that fix I added some new tests to make sure to run these
>> tests on both batch and streaming runners, as I realized that the test was
>> running only on batch runners before.  I did this by explicitly setting the
>> isBounded attribute on the output of the Create transform. Somehow these
>> new tests I added are making the Flink runner unhappy.
>>
>> I'm not sure why explicitly setting the PCollection to be unbounded is
>> breaking on the Flink runner. We can try and exclude the flink runner from
>> these tests for now, but maybe Max has an idea.
>>
>> Also Ismael, what makes this a batch mode test? When I look at the
>> failing stack trace, the failure is in FlinkStreamingPipelineTranslator.
>>
>> Reuven
>>
>> On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> Apparently the fix of Dynamic Timers for Dataflow broke the
>>> ValidatesRunner tests for Flink in batch mode that were passing before.
>>> Can you please take a look Reuven or Rehman.
>>> Tests are failing since the exact commit for the fix:
>>> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
>>> For details on the exception:
>>>
>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>>>
>>>
>>> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> Great to know you get it working on Dataflow easily Reuven. As a new
>>>> feature it
>>>> looks great!
>>>>
>>>> Agree with Kenn maybe worth to open a new thread to discuss the changes
>>>> still
>>>> needed to support this in portable runners.
>>>>
>>>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> I think the (lack of) portability bit may have been buried in this
>>>>> thread. Maybe a new thread about the design for that?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>>>>> that runners that don't support this feature will reject the pipeline.
>>>>>>
>>>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>>>
>>>>>>> A larger question is how to support this in the portability layer.
>>>>>>> Right now portability assumes that each timer id corresponds to a logical
>>>>>>> input PCollection, but that assumption no longer works as we now support a
>>>>>>> dynamic set of timers, each with their own id. We could instead model each
>>>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>>>> dynamically get the timer id in order to invoke it, and today it statically
>>>>>>> reads the timer id from the PCollection name.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The
>>>>>>>> tests indeed never ran on any runner except for the DirectRunner, which is
>>>>>>>> something I should've noticed in the code review.
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I had a discussion with Rehman last week and we discovered that
>>>>>>>>> the TimersMap
>>>>>>>>> related tests were not running for all runners because they were
>>>>>>>>> not tagged as
>>>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>>>> this, so
>>>>>>>>> please someone help me with the review/merge.
>>>>>>>>>
>>>>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>>>>> passing for
>>>>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>>>>> are not
>>>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>>>> probably worth
>>>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>>>
>>>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>>>> [2]
>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>>>> [3]
>>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should
>>>>>>>>>> be fairly trivial.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <
>>>>>>>>>> mxm@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> The Flink Runner was allowing to set a timer multiple times
>>>>>>>>>>> before we
>>>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>>>> invocations.
>>>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed
>>>>>>>>>>> this. Flink
>>>>>>>>>>> and Spark itself allow for a timer to be set to multiple times.
>>>>>>>>>>> In order
>>>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>>>> checkpointed
>>>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>>>
>>>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>>>> supported
>>>>>>>>>>> in the Flink Runner due to the map not taking the family name
>>>>>>>>>>> into
>>>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>>>
>>>>>>>>>>> -Max
>>>>>>>>>>>
>>>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>>>> TimerReceiver
>>>>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>>>>> >
>>>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>>>> handles
>>>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>>>> provide an
>>>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>>>> >
>>>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>>>> timer
>>>>>>>>>>> > family. Beam semantics say that if a timer is set twice with
>>>>>>>>>>> the same
>>>>>>>>>>> > id, then the second timer overwrites the first.  Several
>>>>>>>>>>> runners
>>>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>>>> > timer family scopes the timers, we now allow two timers with
>>>>>>>>>>> the same id
>>>>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>>>>> updated
>>>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>>>> >
>>>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated!
>>>>>>>>>>> I wonder
>>>>>>>>>>> > if this means that the Spark runner was incorrectly
>>>>>>>>>>> implementing the
>>>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>>>>> with the
>>>>>>>>>>> > same id?
>>>>>>>>>>> >
>>>>>>>>>>> > Reuven
>>>>>>>>>>> >
>>>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <
>>>>>>>>>>> iemejia@gmail.com
>>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>>>> >
>>>>>>>>>>> >     I have some questions (note I have not looked at the code
>>>>>>>>>>> at all).
>>>>>>>>>>> >
>>>>>>>>>>> >     - Is this working for both portable and non portable
>>>>>>>>>>> runners?
>>>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>>>> (e.g. Spark)?
>>>>>>>>>>> >
>>>>>>>>>>> >     Maybe worth to add this to the website Compatibility
>>>>>>>>>>> Matrix.
>>>>>>>>>>> >
>>>>>>>>>>> >     Regards,
>>>>>>>>>>> >     Ismaël
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>>>> >     <rehman.muradali@venturedive.com
>>>>>>>>>>> >     <ma...@venturedive.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>>>> development
>>>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>>>> the Beam
>>>>>>>>>>> >         project.
>>>>>>>>>>> >
>>>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>>>> >
>>>>>>>>>>> >         *
>>>>>>>>>>> >         *
>>>>>>>>>>> >
>>>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>>>> >         Software Engineer
>>>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>>>> relax@google.com
>>>>>>>>>>> >         <ma...@google.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>>>> supports
>>>>>>>>>>> >             dynamic timers. As a reminder, this was discussed
>>>>>>>>>>> on the dev
>>>>>>>>>>> >             list some time back.
>>>>>>>>>>> >
>>>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>>>> declare all
>>>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>>>> timers,
>>>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>>>> callbacks -
>>>>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>>>>> stumbling
>>>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>>>> (often
>>>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>>>> Beam. The
>>>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>>>> >
>>>>>>>>>>> >             The new support allows declaring a TimerMap, which
>>>>>>>>>>> is a map
>>>>>>>>>>> >             of timers. Each TimerMap is scoped by a family
>>>>>>>>>>> name, so you
>>>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>>>> callback.
>>>>>>>>>>> >             The use looks as follows:
>>>>>>>>>>> >
>>>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>>>> >
>>>>>>>>>>> >                 @ProcessElement
>>>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>>>> TimerMap
>>>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>>>> >                     timers.set("actionType" +
>>>>>>>>>>> e.getActionType(), timestamp);
>>>>>>>>>>> >                 }
>>>>>>>>>>> >
>>>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>>>> timerId);
>>>>>>>>>>> >                }
>>>>>>>>>>> >             }
>>>>>>>>>>> >
>>>>>>>>>>> >             This currently works for the Flink and the
>>>>>>>>>>> Dataflow runners.
>>>>>>>>>>> >
>>>>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>>>>> will find
>>>>>>>>>>> >             it very valuable.
>>>>>>>>>>> >
>>>>>>>>>>> >             Reuven
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>

Re: Dynamic timers now supported!

Posted by Reuven Lax <re...@google.com>.
This is running as part of the validatesRunnerBatch test, but it is
executing a streaming test. Maybe that's why it's failing?

On Fri, Feb 14, 2020 at 9:42 AM Reuven Lax <re...@google.com> wrote:

> Ismael,
>
> As part of that fix I added some new tests to make sure to run these tests
> on both batch and streaming runners, as I realized that the test was
> running only on batch runners before.  I did this by explicitly setting the
> isBounded attribute on the output of the Create transform. Somehow these
> new tests I added are making the Flink runner unhappy.
>
> I'm not sure why explicitly setting the PCollection to be unbounded is
> breaking on the Flink runner. We can try and exclude the flink runner from
> these tests for now, but maybe Max has an idea.
>
> Also Ismael, what makes this a batch mode test? When I look at the failing
> stack trace, the failure is in FlinkStreamingPipelineTranslator.
>
> Reuven
>
> On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Apparently the fix of Dynamic Timers for Dataflow broke the
>> ValidatesRunner tests for Flink in batch mode that were passing before.
>> Can you please take a look Reuven or Rehman.
>> Tests are failing since the exact commit for the fix:
>> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
>> For details on the exception:
>>
>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>>
>>
>> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> Great to know you get it working on Dataflow easily Reuven. As a new
>>> feature it
>>> looks great!
>>>
>>> Agree with Kenn maybe worth to open a new thread to discuss the changes
>>> still
>>> needed to support this in portable runners.
>>>
>>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> I think the (lack of) portability bit may have been buried in this
>>>> thread. Maybe a new thread about the design for that?
>>>>
>>>> Kenn
>>>>
>>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>>>> that runners that don't support this feature will reject the pipeline.
>>>>>
>>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>>
>>>>>> A larger question is how to support this in the portability layer.
>>>>>> Right now portability assumes that each timer id corresponds to a logical
>>>>>> input PCollection, but that assumption no longer works as we now support a
>>>>>> dynamic set of timers, each with their own id. We could instead model each
>>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>>> dynamically get the timer id in order to invoke it, and today it statically
>>>>>> reads the timer id from the PCollection name.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The
>>>>>>> tests indeed never ran on any runner except for the DirectRunner, which is
>>>>>>> something I should've noticed in the code review.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>>>>> TimersMap
>>>>>>>> related tests were not running for all runners because they were
>>>>>>>> not tagged as
>>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>>> this, so
>>>>>>>> please someone help me with the review/merge.
>>>>>>>>
>>>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>>>> passing for
>>>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>>>> are not
>>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>>> probably worth
>>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>>> [2]
>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>>> [3]
>>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should
>>>>>>>>> be fairly trivial.
>>>>>>>>>
>>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The Flink Runner was allowing to set a timer multiple times
>>>>>>>>>> before we
>>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>>> invocations.
>>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>>>>> Flink
>>>>>>>>>> and Spark itself allow for a timer to be set to multiple times.
>>>>>>>>>> In order
>>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>>> checkpointed
>>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>>
>>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>>> supported
>>>>>>>>>> in the Flink Runner due to the map not taking the family name
>>>>>>>>>> into
>>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>>
>>>>>>>>>> -Max
>>>>>>>>>>
>>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>>> TimerReceiver
>>>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>>>> >
>>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>>> handles
>>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>>> provide an
>>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>>> >
>>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>>> timer
>>>>>>>>>> > family. Beam semantics say that if a timer is set twice with
>>>>>>>>>> the same
>>>>>>>>>> > id, then the second timer overwrites the first.  Several
>>>>>>>>>> runners
>>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>>> > timer family scopes the timers, we now allow two timers with
>>>>>>>>>> the same id
>>>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>>>> updated
>>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>>> >
>>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>>>>> wonder
>>>>>>>>>> > if this means that the Spark runner was incorrectly
>>>>>>>>>> implementing the
>>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>>>> with the
>>>>>>>>>> > same id?
>>>>>>>>>> >
>>>>>>>>>> > Reuven
>>>>>>>>>> >
>>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>>> >
>>>>>>>>>> >     I have some questions (note I have not looked at the code
>>>>>>>>>> at all).
>>>>>>>>>> >
>>>>>>>>>> >     - Is this working for both portable and non portable
>>>>>>>>>> runners?
>>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>>> (e.g. Spark)?
>>>>>>>>>> >
>>>>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>>>>> >
>>>>>>>>>> >     Regards,
>>>>>>>>>> >     Ismaël
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>>> >     <rehman.muradali@venturedive.com
>>>>>>>>>> >     <ma...@venturedive.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>>> development
>>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>>> the Beam
>>>>>>>>>> >         project.
>>>>>>>>>> >
>>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>>> >
>>>>>>>>>> >         *
>>>>>>>>>> >         *
>>>>>>>>>> >
>>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>>> >         Software Engineer
>>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>>> relax@google.com
>>>>>>>>>> >         <ma...@google.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>>> supports
>>>>>>>>>> >             dynamic timers. As a reminder, this was discussed
>>>>>>>>>> on the dev
>>>>>>>>>> >             list some time back.
>>>>>>>>>> >
>>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>>> declare all
>>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>>> timers,
>>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>>> callbacks -
>>>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>>>> stumbling
>>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>>> (often
>>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>>> Beam. The
>>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>>> >
>>>>>>>>>> >             The new support allows declaring a TimerMap, which
>>>>>>>>>> is a map
>>>>>>>>>> >             of timers. Each TimerMap is scoped by a family
>>>>>>>>>> name, so you
>>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>>> callback.
>>>>>>>>>> >             The use looks as follows:
>>>>>>>>>> >
>>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>>> >
>>>>>>>>>> >                 @ProcessElement
>>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>>> TimerMap
>>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>>> >                     timers.set("actionType" +
>>>>>>>>>> e.getActionType(), timestamp);
>>>>>>>>>> >                 }
>>>>>>>>>> >
>>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>>> timerId);
>>>>>>>>>> >                }
>>>>>>>>>> >             }
>>>>>>>>>> >
>>>>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>>>>> runners.
>>>>>>>>>> >
>>>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>>>> will find
>>>>>>>>>> >             it very valuable.
>>>>>>>>>> >
>>>>>>>>>> >             Reuven
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>

Re: Dynamic timers now supported!

Posted by Reuven Lax <re...@google.com>.
Ismael,

As part of that fix I added some new tests to make sure to run these tests
on both batch and streaming runners, as I realized that the test was
running only on batch runners before.  I did this by explicitly setting the
isBounded attribute on the output of the Create transform. Somehow these
new tests I added are making the Flink runner unhappy.

I'm not sure why explicitly setting the PCollection to be unbounded is
breaking on the Flink runner. We can try and exclude the flink runner from
these tests for now, but maybe Max has an idea.

Also Ismael, what makes this a batch mode test? When I look at the failing
stack trace, the failure is in FlinkStreamingPipelineTranslator.

Reuven

On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Apparently the fix of Dynamic Timers for Dataflow broke the
> ValidatesRunner tests for Flink in batch mode that were passing before.
> Can you please take a look Reuven or Rehman.
> Tests are failing since the exact commit for the fix:
> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
> For details on the exception:
>
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>
>
> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Great to know you get it working on Dataflow easily Reuven. As a new
>> feature it
>> looks great!
>>
>> Agree with Kenn maybe worth to open a new thread to discuss the changes
>> still
>> needed to support this in portable runners.
>>
>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> I think the (lack of) portability bit may have been buried in this
>>> thread. Maybe a new thread about the design for that?
>>>
>>> Kenn
>>>
>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>>> that runners that don't support this feature will reject the pipeline.
>>>>
>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>
>>>>> A larger question is how to support this in the portability layer.
>>>>> Right now portability assumes that each timer id corresponds to a logical
>>>>> input PCollection, but that assumption no longer works as we now support a
>>>>> dynamic set of timers, each with their own id. We could instead model each
>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>> dynamically get the timer id in order to invoke it, and today it statically
>>>>> reads the timer id from the PCollection name.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>>>>> indeed never ran on any runner except for the DirectRunner, which is
>>>>>> something I should've noticed in the code review.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>>>> TimersMap
>>>>>>> related tests were not running for all runners because they were not
>>>>>>> tagged as
>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>> this, so
>>>>>>> please someone help me with the review/merge.
>>>>>>>
>>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>>> passing for
>>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>>> are not
>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>> probably worth
>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>
>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>> [2]
>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>> [3]
>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>>>>> fairly trivial.
>>>>>>>>
>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The Flink Runner was allowing to set a timer multiple times before
>>>>>>>>> we
>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>> invocations.
>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>>>> Flink
>>>>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>>>>> order
>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>> checkpointed
>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>
>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>> supported
>>>>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>> TimerReceiver
>>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>>> >
>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>> handles
>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>> provide an
>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>> >
>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>> timer
>>>>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>>>>> same
>>>>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>>>>> same id
>>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>>> updated
>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>> >
>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>>>> wonder
>>>>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>>>>> the
>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>>> with the
>>>>>>>>> > same id?
>>>>>>>>> >
>>>>>>>>> > Reuven
>>>>>>>>> >
>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>> >
>>>>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>>>>> all).
>>>>>>>>> >
>>>>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>> (e.g. Spark)?
>>>>>>>>> >
>>>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>>>> >
>>>>>>>>> >     Regards,
>>>>>>>>> >     Ismaël
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>> >     <rehman.muradali@venturedive.com
>>>>>>>>> >     <ma...@venturedive.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>> development
>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>> the Beam
>>>>>>>>> >         project.
>>>>>>>>> >
>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>> >
>>>>>>>>> >         *
>>>>>>>>> >         *
>>>>>>>>> >
>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>> >         Software Engineer
>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>> relax@google.com
>>>>>>>>> >         <ma...@google.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>> supports
>>>>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>>>>> the dev
>>>>>>>>> >             list some time back.
>>>>>>>>> >
>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>> declare all
>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>> timers,
>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>> callbacks -
>>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>>> stumbling
>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>> (often
>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>> Beam. The
>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>> >
>>>>>>>>> >             The new support allows declaring a TimerMap, which
>>>>>>>>> is a map
>>>>>>>>> >             of timers. Each TimerMap is scoped by a family name,
>>>>>>>>> so you
>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>> callback.
>>>>>>>>> >             The use looks as follows:
>>>>>>>>> >
>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>> >
>>>>>>>>> >                 @ProcessElement
>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>> TimerMap
>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>>>>> timestamp);
>>>>>>>>> >                 }
>>>>>>>>> >
>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>> timerId);
>>>>>>>>> >                }
>>>>>>>>> >             }
>>>>>>>>> >
>>>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>>>> runners.
>>>>>>>>> >
>>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>>> will find
>>>>>>>>> >             it very valuable.
>>>>>>>>> >
>>>>>>>>> >             Reuven
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Re: Dynamic timers now supported!

Posted by Luke Cwik <lc...@google.com>.
+1 for portable implementation and design. Having features only developed
using the non-portable implementation in mind will mean that the
portability effort gets bogged down with filling in features that were
partially completed.

On Fri, Feb 14, 2020 at 8:52 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Apparently the fix of Dynamic Timers for Dataflow broke the
> ValidatesRunner tests for Flink in batch mode that were passing before.
> Can you please take a look Reuven or Rehman.
> Tests are failing since the exact commit for the fix:
> 7719708a04d5d0eff3048dbd58ac1337889f8ba5
> For details on the exception:
>
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/
>
>
> On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> Great to know you get it working on Dataflow easily Reuven. As a new
>> feature it
>> looks great!
>>
>> Agree with Kenn maybe worth to open a new thread to discuss the changes
>> still
>> needed to support this in portable runners.
>>
>> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> I think the (lack of) portability bit may have been buried in this
>>> thread. Maybe a new thread about the design for that?
>>>
>>> Kenn
>>>
>>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>>> that runners that don't support this feature will reject the pipeline.
>>>>
>>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>>
>>>>> A larger question is how to support this in the portability layer.
>>>>> Right now portability assumes that each timer id corresponds to a logical
>>>>> input PCollection, but that assumption no longer works as we now support a
>>>>> dynamic set of timers, each with their own id. We could instead model each
>>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>>> dynamically get the timer id in order to invoke it, and today it statically
>>>>> reads the timer id from the PCollection name.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>>>>> indeed never ran on any runner except for the DirectRunner, which is
>>>>>> something I should've noticed in the code review.
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>>>> TimersMap
>>>>>>> related tests were not running for all runners because they were not
>>>>>>> tagged as
>>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>>> this, so
>>>>>>> please someone help me with the review/merge.
>>>>>>>
>>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>>> passing for
>>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>>> are not
>>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>>> probably worth
>>>>>>> to reopen the issue to investigate/fix.
>>>>>>>
>>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>>> [2]
>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>>> [3]
>>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>>>>> fairly trivial.
>>>>>>>>
>>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The Flink Runner was allowing to set a timer multiple times before
>>>>>>>>> we
>>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>>> invocations.
>>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>>>> Flink
>>>>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>>>>> order
>>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>>> checkpointed
>>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>>
>>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>>> supported
>>>>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>>>>> account. This can be easily fixed though.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>>> TimerReceiver
>>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>>> >
>>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>>> handles
>>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>>> provide an
>>>>>>>>> > API that allows users to access this feature.
>>>>>>>>> >
>>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>>> timer
>>>>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>>>>> same
>>>>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>>>>> same id
>>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>>> updated
>>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>>> >
>>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>>>> wonder
>>>>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>>>>> the
>>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>>> with the
>>>>>>>>> > same id?
>>>>>>>>> >
>>>>>>>>> > Reuven
>>>>>>>>> >
>>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>>> >
>>>>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>>>>> all).
>>>>>>>>> >
>>>>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>>> (e.g. Spark)?
>>>>>>>>> >
>>>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>>>> >
>>>>>>>>> >     Regards,
>>>>>>>>> >     Ismaël
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>>> >     <rehman.muradali@venturedive.com
>>>>>>>>> >     <ma...@venturedive.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>>> development
>>>>>>>>> >         process. I am delighted to contribute my two cents to
>>>>>>>>> the Beam
>>>>>>>>> >         project.
>>>>>>>>> >
>>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>>> >
>>>>>>>>> >         *
>>>>>>>>> >         *
>>>>>>>>> >
>>>>>>>>> >         *Thanks & Regards____*
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>>> >         Software Engineer
>>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>>> >         Skype: rehman.muradali
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >
>>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>>> relax@google.com
>>>>>>>>> >         <ma...@google.com>> wrote:
>>>>>>>>> >
>>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>>> supports
>>>>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>>>>> the dev
>>>>>>>>> >             list some time back.
>>>>>>>>> >
>>>>>>>>> >             As background, previously one had to statically
>>>>>>>>> declare all
>>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>>> timers,
>>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>>> callbacks -
>>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>>> stumbling
>>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>>> (often
>>>>>>>>> >             based on the element), which was not supported in
>>>>>>>>> Beam. The
>>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>>> >
>>>>>>>>> >             The new support allows declaring a TimerMap, which
>>>>>>>>> is a map
>>>>>>>>> >             of timers. Each TimerMap is scoped by a family name,
>>>>>>>>> so you
>>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>>> callback.
>>>>>>>>> >             The use looks as follows:
>>>>>>>>> >
>>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>>> >
>>>>>>>>> >                 @ProcessElement
>>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>>> TimerMap
>>>>>>>>> >             timers, @Element Type e) {
>>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>>>>> timestamp);
>>>>>>>>> >                 }
>>>>>>>>> >
>>>>>>>>> >                @OnTimerFamily .
>>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>>> timerId);
>>>>>>>>> >                }
>>>>>>>>> >             }
>>>>>>>>> >
>>>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>>>> runners.
>>>>>>>>> >
>>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>>> will find
>>>>>>>>> >             it very valuable.
>>>>>>>>> >
>>>>>>>>> >             Reuven
>>>>>>>>> >
>>>>>>>>>
>>>>>>>>

Re: Dynamic timers now supported!

Posted by Ismaël Mejía <ie...@gmail.com>.
Apparently the fix of Dynamic Timers for Dataflow broke the ValidatesRunner
tests for Flink in batch mode that were passing before.
Can you please take a look Reuven or Rehman.
Tests are failing since the exact commit for the fix:
7719708a04d5d0eff3048dbd58ac1337889f8ba5
For details on the exception:
https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/6608/


On Wed, Feb 12, 2020 at 10:37 AM Ismaël Mejía <ie...@gmail.com> wrote:

> Great to know you get it working on Dataflow easily Reuven. As a new
> feature it
> looks great!
>
> Agree with Kenn maybe worth to open a new thread to discuss the changes
> still
> needed to support this in portable runners.
>
> On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> I think the (lack of) portability bit may have been buried in this
>> thread. Maybe a new thread about the design for that?
>>
>> Kenn
>>
>> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>>
>>> FYI, this is now fixed for Dataflow. I also added better rejection so
>>> that runners that don't support this feature will reject the pipeline.
>>>
>>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>>
>>>> A larger question is how to support this in the portability layer.
>>>> Right now portability assumes that each timer id corresponds to a logical
>>>> input PCollection, but that assumption no longer works as we now support a
>>>> dynamic set of timers, each with their own id. We could instead model each
>>>> timer family as a PColleciton, but the FnApiRunner would need to
>>>> dynamically get the timer id in order to invoke it, and today it statically
>>>> reads the timer id from the PCollection name.
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>>>> indeed never ran on any runner except for the DirectRunner, which is
>>>>> something I should've noticed in the code review.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>>> TimersMap
>>>>>> related tests were not running for all runners because they were not
>>>>>> tagged as
>>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>>> this, so
>>>>>> please someone help me with the review/merge.
>>>>>>
>>>>>> I took a look just for curiosity and discovered that they are only
>>>>>> passing for
>>>>>> Direct runner and for the classic Flink runner in batch mode. They
>>>>>> are not
>>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>>> probably worth
>>>>>> to reopen the issue to investigate/fix.
>>>>>>
>>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>>> [2]
>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>>> [3]
>>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>>>> fairly trivial.
>>>>>>>
>>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The Flink Runner was allowing to set a timer multiple times before
>>>>>>>> we
>>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>>> invocations.
>>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>>> Flink
>>>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>>>> order
>>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>>> checkpointed
>>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>>
>>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>>> supported
>>>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>>>> account. This can be easily fixed though.
>>>>>>>>
>>>>>>>> -Max
>>>>>>>>
>>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>>> TimerReceiver
>>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>>> >
>>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>>> handles
>>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>>> provide an
>>>>>>>> > API that allows users to access this feature.
>>>>>>>> >
>>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>>> timer
>>>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>>>> same
>>>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>>>> same id
>>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>>> updated
>>>>>>>> > to include the timer family id in the map keys.
>>>>>>>> >
>>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>>> wonder
>>>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>>>> the
>>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>>> with the
>>>>>>>> > same id?
>>>>>>>> >
>>>>>>>> > Reuven
>>>>>>>> >
>>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>>> >
>>>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>>>> all).
>>>>>>>> >
>>>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>>>> >     - What do other runners need to implement to support this
>>>>>>>> (e.g. Spark)?
>>>>>>>> >
>>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>>> >
>>>>>>>> >     Regards,
>>>>>>>> >     Ismaël
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>>> >     <rehman.muradali@venturedive.com
>>>>>>>> >     <ma...@venturedive.com>> wrote:
>>>>>>>> >
>>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>>> development
>>>>>>>> >         process. I am delighted to contribute my two cents to the
>>>>>>>> Beam
>>>>>>>> >         project.
>>>>>>>> >
>>>>>>>> >         Looking forward to more active contributions.
>>>>>>>> >
>>>>>>>> >         *
>>>>>>>> >         *
>>>>>>>> >
>>>>>>>> >         *Thanks & Regards____*
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >         *Rehman Murad Ali*
>>>>>>>> >         Software Engineer
>>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>>> <tel:+92%20345%202076766>
>>>>>>>> >         Skype: rehman.muradali
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >
>>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>>> relax@google.com
>>>>>>>> >         <ma...@google.com>> wrote:
>>>>>>>> >
>>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>>> supports
>>>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>>>> the dev
>>>>>>>> >             list some time back.
>>>>>>>> >
>>>>>>>> >             As background, previously one had to statically
>>>>>>>> declare all
>>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>>> timers,
>>>>>>>> >             you needed to create two timer variables and two
>>>>>>>> callbacks -
>>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>>> stumbling
>>>>>>>> >             blocks where they needed a dynamic set of timers
>>>>>>>> (often
>>>>>>>> >             based on the element), which was not supported in
>>>>>>>> Beam. The
>>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>>> >
>>>>>>>> >             The new support allows declaring a TimerMap, which is
>>>>>>>> a map
>>>>>>>> >             of timers. Each TimerMap is scoped by a family name,
>>>>>>>> so you
>>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>>> callback.
>>>>>>>> >             The use looks as follows:
>>>>>>>> >
>>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>>> >                 @TimerFamily("timers")
>>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>> >
>>>>>>>> >                 @ProcessElement
>>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>>> TimerMap
>>>>>>>> >             timers, @Element Type e) {
>>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>>>> timestamp);
>>>>>>>> >                 }
>>>>>>>> >
>>>>>>>> >                @OnTimerFamily .
>>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>>> timerId);
>>>>>>>> >                }
>>>>>>>> >             }
>>>>>>>> >
>>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>>> runners.
>>>>>>>> >
>>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>>> will find
>>>>>>>> >             it very valuable.
>>>>>>>> >
>>>>>>>> >             Reuven
>>>>>>>> >
>>>>>>>>
>>>>>>>

Re: Dynamic timers now supported!

Posted by Ismaël Mejía <ie...@gmail.com>.
Great to know you get it working on Dataflow easily Reuven. As a new
feature it
looks great!

Agree with Kenn maybe worth to open a new thread to discuss the changes
still
needed to support this in portable runners.

On Mon, Feb 10, 2020 at 8:25 PM Kenneth Knowles <ke...@apache.org> wrote:

> I think the (lack of) portability bit may have been buried in this thread.
> Maybe a new thread about the design for that?
>
> Kenn
>
> On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:
>
>> FYI, this is now fixed for Dataflow. I also added better rejection so
>> that runners that don't support this feature will reject the pipeline.
>>
>> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>>
>>> I took a look, and I think this was a simple bug. Testing a fix now.
>>>
>>> A larger question is how to support this in the portability layer. Right
>>> now portability assumes that each timer id corresponds to a logical input
>>> PCollection, but that assumption no longer works as we now support a
>>> dynamic set of timers, each with their own id. We could instead model each
>>> timer family as a PColleciton, but the FnApiRunner would need to
>>> dynamically get the timer id in order to invoke it, and today it statically
>>> reads the timer id from the PCollection name.
>>>
>>> Reuven
>>>
>>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>>> indeed never ran on any runner except for the DirectRunner, which is
>>>> something I should've noticed in the code review.
>>>>
>>>> Reuven
>>>>
>>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>>
>>>>> I had a discussion with Rehman last week and we discovered that the
>>>>> TimersMap
>>>>> related tests were not running for all runners because they were not
>>>>> tagged as
>>>>> part of the ValidatesRunner category. I opened a PR [1] to enable
>>>>> this, so
>>>>> please someone help me with the review/merge.
>>>>>
>>>>> I took a look just for curiosity and discovered that they are only
>>>>> passing for
>>>>> Direct runner and for the classic Flink runner in batch mode. They are
>>>>> not
>>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>>> probably worth
>>>>> to reopen the issue to investigate/fix.
>>>>>
>>>>> [1] https://github.com/apache/beam/pull/10747
>>>>> [2]
>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>>> [3]
>>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>>
>>>>>
>>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>>> fairly trivial.
>>>>>>
>>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> The Flink Runner was allowing to set a timer multiple times before
>>>>>>> we
>>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>>> invocations.
>>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>>> Flink
>>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>>> order
>>>>>>> to fix this for Beam, the Flink Runner has to maintain a
>>>>>>> checkpointed
>>>>>>> map which sits outside of its builtin TimerService.
>>>>>>>
>>>>>>> As far as I can see, multiple timer families are currently not
>>>>>>> supported
>>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>>> account. This can be easily fixed though.
>>>>>>>
>>>>>>> -Max
>>>>>>>
>>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>>> > The new timer family is in the portability protos. I think
>>>>>>> TimerReceiver
>>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>>> >
>>>>>>> > The TimerInternals class that runners implement today already
>>>>>>> handles
>>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to
>>>>>>> provide an
>>>>>>> > API that allows users to access this feature.
>>>>>>> >
>>>>>>> > The main work needed in the runner was to take in account the
>>>>>>> timer
>>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>>> same
>>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>>> same id
>>>>>>> > as long as the timer families are different. Runners had to be
>>>>>>> updated
>>>>>>> > to include the timer family id in the map keys.
>>>>>>> >
>>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>>> wonder
>>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>>> the
>>>>>>> > Beam semantics before, and setTimer was not overwriting timers
>>>>>>> with the
>>>>>>> > same id?
>>>>>>> >
>>>>>>> > Reuven
>>>>>>> >
>>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>>>>>> > <ma...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>>> >
>>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>>> all).
>>>>>>> >
>>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>>> >     - What do other runners need to implement to support this
>>>>>>> (e.g. Spark)?
>>>>>>> >
>>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>>> >
>>>>>>> >     Regards,
>>>>>>> >     Ismaël
>>>>>>> >
>>>>>>> >
>>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>>> >     <rehman.muradali@venturedive.com
>>>>>>> >     <ma...@venturedive.com>> wrote:
>>>>>>> >
>>>>>>> >         Thank you Reuven for the guidance throughout the
>>>>>>> development
>>>>>>> >         process. I am delighted to contribute my two cents to the
>>>>>>> Beam
>>>>>>> >         project.
>>>>>>> >
>>>>>>> >         Looking forward to more active contributions.
>>>>>>> >
>>>>>>> >         *
>>>>>>> >         *
>>>>>>> >
>>>>>>> >         *Thanks & Regards____*
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >         *Rehman Murad Ali*
>>>>>>> >         Software Engineer
>>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>>> <tel:+92%20345%202076766>
>>>>>>> >         Skype: rehman.muradali
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>>> relax@google.com
>>>>>>> >         <ma...@google.com>> wrote:
>>>>>>> >
>>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>>> supports
>>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>>> the dev
>>>>>>> >             list some time back.
>>>>>>> >
>>>>>>> >             As background, previously one had to statically
>>>>>>> declare all
>>>>>>> >             timers in your code. So if you wanted to have two
>>>>>>> timers,
>>>>>>> >             you needed to create two timer variables and two
>>>>>>> callbacks -
>>>>>>> >             one for each timer. A number of users kept hitting
>>>>>>> stumbling
>>>>>>> >             blocks where they needed a dynamic set of timers (often
>>>>>>> >             based on the element), which was not supported in
>>>>>>> Beam. The
>>>>>>> >             workarounds were quite ugly and complicated.
>>>>>>> >
>>>>>>> >             The new support allows declaring a TimerMap, which is
>>>>>>> a map
>>>>>>> >             of timers. Each TimerMap is scoped by a family name,
>>>>>>> so you
>>>>>>> >             can create multiple TimerMaps each with its own
>>>>>>> callback.
>>>>>>> >             The use looks as follows:
>>>>>>> >
>>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>>> >                 @TimerFamily("timers")
>>>>>>> >                 private final TimerSpec timerMap =
>>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>> >
>>>>>>> >                 @ProcessElement
>>>>>>> >                  public void process(@TimerFamily("timers")
>>>>>>> TimerMap
>>>>>>> >             timers, @Element Type e) {
>>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>>> timestamp);
>>>>>>> >                 }
>>>>>>> >
>>>>>>> >                @OnTimerFamily .
>>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>>> >                   System.out.println("Timer fired. id: " +
>>>>>>> timerId);
>>>>>>> >                }
>>>>>>> >             }
>>>>>>> >
>>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>>> runners.
>>>>>>> >
>>>>>>> >             Thank you Rehman for getting this done! Beam users
>>>>>>> will find
>>>>>>> >             it very valuable.
>>>>>>> >
>>>>>>> >             Reuven
>>>>>>> >
>>>>>>>
>>>>>>

Re: Dynamic timers now supported!

Posted by Kenneth Knowles <ke...@apache.org>.
I think the (lack of) portability bit may have been buried in this thread.
Maybe a new thread about the design for that?

Kenn

On Sun, Feb 9, 2020 at 11:36 AM Reuven Lax <re...@google.com> wrote:

> FYI, this is now fixed for Dataflow. I also added better rejection so that
> runners that don't support this feature will reject the pipeline.
>
> On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:
>
>> I took a look, and I think this was a simple bug. Testing a fix now.
>>
>> A larger question is how to support this in the portability layer. Right
>> now portability assumes that each timer id corresponds to a logical input
>> PCollection, but that assumption no longer works as we now support a
>> dynamic set of timers, each with their own id. We could instead model each
>> timer family as a PColleciton, but the FnApiRunner would need to
>> dynamically get the timer id in order to invoke it, and today it statically
>> reads the timer id from the PCollection name.
>>
>> Reuven
>>
>> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>>> indeed never ran on any runner except for the DirectRunner, which is
>>> something I should've noticed in the code review.
>>>
>>> Reuven
>>>
>>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>>
>>>> I had a discussion with Rehman last week and we discovered that the
>>>> TimersMap
>>>> related tests were not running for all runners because they were not
>>>> tagged as
>>>> part of the ValidatesRunner category. I opened a PR [1] to enable this,
>>>> so
>>>> please someone help me with the review/merge.
>>>>
>>>> I took a look just for curiosity and discovered that they are only
>>>> passing for
>>>> Direct runner and for the classic Flink runner in batch mode. They are
>>>> not
>>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>>> probably worth
>>>> to reopen the issue to investigate/fix.
>>>>
>>>> [1] https://github.com/apache/beam/pull/10747
>>>> [2]
>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>>> [3]
>>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>>
>>>>
>>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>>> fairly trivial.
>>>>>
>>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> The Flink Runner was allowing to set a timer multiple times before we
>>>>>> made it comply with the Beam semantics of overwriting past
>>>>>> invocations.
>>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>>> Flink
>>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>>> order
>>>>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>>>>>> map which sits outside of its builtin TimerService.
>>>>>>
>>>>>> As far as I can see, multiple timer families are currently not
>>>>>> supported
>>>>>> in the Flink Runner due to the map not taking the family name into
>>>>>> account. This can be easily fixed though.
>>>>>>
>>>>>> -Max
>>>>>>
>>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>>> > The new timer family is in the portability protos. I think
>>>>>> TimerReceiver
>>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>>> >
>>>>>> > The TimerInternals class that runners implement today already
>>>>>> handles
>>>>>> > dynamic timers, so most of the work was in the Beam SDK  to provide
>>>>>> an
>>>>>> > API that allows users to access this feature.
>>>>>> >
>>>>>> > The main work needed in the runner was to take in account the timer
>>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>>> same
>>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>>> > therefore had maps from timer id -> timer. However since the
>>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>>> same id
>>>>>> > as long as the timer families are different. Runners had to be
>>>>>> updated
>>>>>> > to include the timer family id in the map keys.
>>>>>> >
>>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>>> wonder
>>>>>> > if this means that the Spark runner was incorrectly implementing
>>>>>> the
>>>>>> > Beam semantics before, and setTimer was not overwriting timers with
>>>>>> the
>>>>>> > same id?
>>>>>> >
>>>>>> > Reuven
>>>>>> >
>>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>>>>> > <ma...@gmail.com>> wrote:
>>>>>> >
>>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>>> >
>>>>>> >     I have some questions (note I have not looked at the code at
>>>>>> all).
>>>>>> >
>>>>>> >     - Is this working for both portable and non portable runners?
>>>>>> >     - What do other runners need to implement to support this (e.g.
>>>>>> Spark)?
>>>>>> >
>>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>>> >
>>>>>> >     Regards,
>>>>>> >     Ismaël
>>>>>> >
>>>>>> >
>>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>>> >     <rehman.muradali@venturedive.com
>>>>>> >     <ma...@venturedive.com>> wrote:
>>>>>> >
>>>>>> >         Thank you Reuven for the guidance throughout the development
>>>>>> >         process. I am delighted to contribute my two cents to the
>>>>>> Beam
>>>>>> >         project.
>>>>>> >
>>>>>> >         Looking forward to more active contributions.
>>>>>> >
>>>>>> >         *
>>>>>> >         *
>>>>>> >
>>>>>> >         *Thanks & Regards____*
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >         *Rehman Murad Ali*
>>>>>> >         Software Engineer
>>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>>> <tel:+92%20345%202076766>
>>>>>> >         Skype: rehman.muradali
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>>> relax@google.com
>>>>>> >         <ma...@google.com>> wrote:
>>>>>> >
>>>>>> >             Thanks to a lot of hard work by Rehman, Beam now
>>>>>> supports
>>>>>> >             dynamic timers. As a reminder, this was discussed on
>>>>>> the dev
>>>>>> >             list some time back.
>>>>>> >
>>>>>> >             As background, previously one had to statically declare
>>>>>> all
>>>>>> >             timers in your code. So if you wanted to have two
>>>>>> timers,
>>>>>> >             you needed to create two timer variables and two
>>>>>> callbacks -
>>>>>> >             one for each timer. A number of users kept hitting
>>>>>> stumbling
>>>>>> >             blocks where they needed a dynamic set of timers (often
>>>>>> >             based on the element), which was not supported in Beam.
>>>>>> The
>>>>>> >             workarounds were quite ugly and complicated.
>>>>>> >
>>>>>> >             The new support allows declaring a TimerMap, which is a
>>>>>> map
>>>>>> >             of timers. Each TimerMap is scoped by a family name, so
>>>>>> you
>>>>>> >             can create multiple TimerMaps each with its own
>>>>>> callback.
>>>>>> >             The use looks as follows:
>>>>>> >
>>>>>> >             class MyDoFn extends DoFn<...> {
>>>>>> >                 @TimerFamily("timers")
>>>>>> >                 private final TimerSpec timerMap =
>>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>> >
>>>>>> >                 @ProcessElement
>>>>>> >                  public void process(@TimerFamily("timers") TimerMap
>>>>>> >             timers, @Element Type e) {
>>>>>> >                     timers.set("mainTimer", timestamp);
>>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>>> timestamp);
>>>>>> >                 }
>>>>>> >
>>>>>> >                @OnTimerFamily .
>>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>>> >                   System.out.println("Timer fired. id: " + timerId);
>>>>>> >                }
>>>>>> >             }
>>>>>> >
>>>>>> >             This currently works for the Flink and the Dataflow
>>>>>> runners.
>>>>>> >
>>>>>> >             Thank you Rehman for getting this done! Beam users will
>>>>>> find
>>>>>> >             it very valuable.
>>>>>> >
>>>>>> >             Reuven
>>>>>> >
>>>>>>
>>>>>

Re: Dynamic timers now supported!

Posted by Reuven Lax <re...@google.com>.
FYI, this is now fixed for Dataflow. I also added better rejection so that
runners that don't support this feature will reject the pipeline.

On Sat, Feb 8, 2020 at 12:10 AM Reuven Lax <re...@google.com> wrote:

> I took a look, and I think this was a simple bug. Testing a fix now.
>
> A larger question is how to support this in the portability layer. Right
> now portability assumes that each timer id corresponds to a logical input
> PCollection, but that assumption no longer works as we now support a
> dynamic set of timers, each with their own id. We could instead model each
> timer family as a PColleciton, but the FnApiRunner would need to
> dynamically get the timer id in order to invoke it, and today it statically
> reads the timer id from the PCollection name.
>
> Reuven
>
> On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:
>
>> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
>> indeed never ran on any runner except for the DirectRunner, which is
>> something I should've noticed in the code review.
>>
>> Reuven
>>
>> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com> wrote:
>>
>>> I had a discussion with Rehman last week and we discovered that the
>>> TimersMap
>>> related tests were not running for all runners because they were not
>>> tagged as
>>> part of the ValidatesRunner category. I opened a PR [1] to enable this,
>>> so
>>> please someone help me with the review/merge.
>>>
>>> I took a look just for curiosity and discovered that they are only
>>> passing for
>>> Direct runner and for the classic Flink runner in batch mode. They are
>>> not
>>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>>> probably worth
>>> to reopen the issue to investigate/fix.
>>>
>>> [1] https://github.com/apache/beam/pull/10747
>>> [2]
>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>>> [3]
>>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>>
>>>
>>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Yes. For now we exclude the flink runner, but fixing this should be
>>>> fairly trivial.
>>>>
>>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>>>> wrote:
>>>>
>>>>> The Flink Runner was allowing to set a timer multiple times before we
>>>>> made it comply with the Beam semantics of overwriting past
>>>>> invocations.
>>>>> I wouldn't be surprised if the Spark Runner never addressed this.
>>>>> Flink
>>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>>> order
>>>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>>>>> map which sits outside of its builtin TimerService.
>>>>>
>>>>> As far as I can see, multiple timer families are currently not
>>>>> supported
>>>>> in the Flink Runner due to the map not taking the family name into
>>>>> account. This can be easily fixed though.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>>> > The new timer family is in the portability protos. I think
>>>>> TimerReceiver
>>>>> > needs to be updated to set it though (I think a 1-line change).
>>>>> >
>>>>> > The TimerInternals class that runners implement today already
>>>>> handles
>>>>> > dynamic timers, so most of the work was in the Beam SDK  to provide
>>>>> an
>>>>> > API that allows users to access this feature.
>>>>> >
>>>>> > The main work needed in the runner was to take in account the timer
>>>>> > family. Beam semantics say that if a timer is set twice with the
>>>>> same
>>>>> > id, then the second timer overwrites the first.  Several runners
>>>>> > therefore had maps from timer id -> timer. However since the
>>>>> > timer family scopes the timers, we now allow two timers with the
>>>>> same id
>>>>> > as long as the timer families are different. Runners had to be
>>>>> updated
>>>>> > to include the timer family id in the map keys.
>>>>> >
>>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>>> wonder
>>>>> > if this means that the Spark runner was incorrectly implementing the
>>>>> > Beam semantics before, and setTimer was not overwriting timers with
>>>>> the
>>>>> > same id?
>>>>> >
>>>>> > Reuven
>>>>> >
>>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>>>> > <ma...@gmail.com>> wrote:
>>>>> >
>>>>> >     This looks great, thanks for the contribution Rehman!
>>>>> >
>>>>> >     I have some questions (note I have not looked at the code at
>>>>> all).
>>>>> >
>>>>> >     - Is this working for both portable and non portable runners?
>>>>> >     - What do other runners need to implement to support this (e.g.
>>>>> Spark)?
>>>>> >
>>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>>> >
>>>>> >     Regards,
>>>>> >     Ismaël
>>>>> >
>>>>> >
>>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>>> >     <rehman.muradali@venturedive.com
>>>>> >     <ma...@venturedive.com>> wrote:
>>>>> >
>>>>> >         Thank you Reuven for the guidance throughout the development
>>>>> >         process. I am delighted to contribute my two cents to the
>>>>> Beam
>>>>> >         project.
>>>>> >
>>>>> >         Looking forward to more active contributions.
>>>>> >
>>>>> >         *
>>>>> >         *
>>>>> >
>>>>> >         *Thanks & Regards____*
>>>>> >
>>>>> >
>>>>> >
>>>>> >         *Rehman Murad Ali*
>>>>> >         Software Engineer
>>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>>> <tel:+92%20345%202076766>
>>>>> >         Skype: rehman.muradali
>>>>> >
>>>>> >
>>>>> >
>>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <
>>>>> relax@google.com
>>>>> >         <ma...@google.com>> wrote:
>>>>> >
>>>>> >             Thanks to a lot of hard work by Rehman, Beam now supports
>>>>> >             dynamic timers. As a reminder, this was discussed on the
>>>>> dev
>>>>> >             list some time back.
>>>>> >
>>>>> >             As background, previously one had to statically declare
>>>>> all
>>>>> >             timers in your code. So if you wanted to have two timers,
>>>>> >             you needed to create two timer variables and two
>>>>> callbacks -
>>>>> >             one for each timer. A number of users kept hitting
>>>>> stumbling
>>>>> >             blocks where they needed a dynamic set of timers (often
>>>>> >             based on the element), which was not supported in Beam.
>>>>> The
>>>>> >             workarounds were quite ugly and complicated.
>>>>> >
>>>>> >             The new support allows declaring a TimerMap, which is a
>>>>> map
>>>>> >             of timers. Each TimerMap is scoped by a family name, so
>>>>> you
>>>>> >             can create multiple TimerMaps each with its own callback.
>>>>> >             The use looks as follows:
>>>>> >
>>>>> >             class MyDoFn extends DoFn<...> {
>>>>> >                 @TimerFamily("timers")
>>>>> >                 private final TimerSpec timerMap =
>>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>> >
>>>>> >                 @ProcessElement
>>>>> >                  public void process(@TimerFamily("timers") TimerMap
>>>>> >             timers, @Element Type e) {
>>>>> >                     timers.set("mainTimer", timestamp);
>>>>> >                     timers.set("actionType" + e.getActionType(),
>>>>> timestamp);
>>>>> >                 }
>>>>> >
>>>>> >                @OnTimerFamily .
>>>>> >                public void onTimer(@TimerId String timerId) {
>>>>> >                   System.out.println("Timer fired. id: " + timerId);
>>>>> >                }
>>>>> >             }
>>>>> >
>>>>> >             This currently works for the Flink and the Dataflow
>>>>> runners.
>>>>> >
>>>>> >             Thank you Rehman for getting this done! Beam users will
>>>>> find
>>>>> >             it very valuable.
>>>>> >
>>>>> >             Reuven
>>>>> >
>>>>>
>>>>

Re: Dynamic timers now supported!

Posted by Reuven Lax <re...@google.com>.
I took a look, and I think this was a simple bug. Testing a fix now.

A larger question is how to support this in the portability layer. Right
now portability assumes that each timer id corresponds to a logical input
PCollection, but that assumption no longer works as we now support a
dynamic set of timers, each with their own id. We could instead model each
timer family as a PColleciton, but the FnApiRunner would need to
dynamically get the timer id in order to invoke it, and today it statically
reads the timer id from the PCollection name.

Reuven

On Fri, Feb 7, 2020 at 2:22 PM Reuven Lax <re...@google.com> wrote:

> Thanks for finding this. Hopefully the bug is easy .to fix. The tests
> indeed never ran on any runner except for the DirectRunner, which is
> something I should've noticed in the code review.
>
> Reuven
>
> On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com> wrote:
>
>> I had a discussion with Rehman last week and we discovered that the
>> TimersMap
>> related tests were not running for all runners because they were not
>> tagged as
>> part of the ValidatesRunner category. I opened a PR [1] to enable this, so
>> please someone help me with the review/merge.
>>
>> I took a look just for curiosity and discovered that they are only
>> passing for
>> Direct runner and for the classic Flink runner in batch mode. They are not
>> passing for Dataflow [2][3] and for the Portable Flink runner, so
>> probably worth
>> to reopen the issue to investigate/fix.
>>
>> [1] https://github.com/apache/beam/pull/10747
>> [2]
>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
>> [3]
>> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>>
>>
>> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Yes. For now we exclude the flink runner, but fixing this should be
>>> fairly trivial.
>>>
>>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>
>>>> The Flink Runner was allowing to set a timer multiple times before we
>>>> made it comply with the Beam semantics of overwriting past invocations.
>>>> I wouldn't be surprised if the Spark Runner never addressed this. Flink
>>>> and Spark itself allow for a timer to be set to multiple times. In
>>>> order
>>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>>>> map which sits outside of its builtin TimerService.
>>>>
>>>> As far as I can see, multiple timer families are currently not
>>>> supported
>>>> in the Flink Runner due to the map not taking the family name into
>>>> account. This can be easily fixed though.
>>>>
>>>> -Max
>>>>
>>>> On 24.01.20 21:31, Reuven Lax wrote:
>>>> > The new timer family is in the portability protos. I think
>>>> TimerReceiver
>>>> > needs to be updated to set it though (I think a 1-line change).
>>>> >
>>>> > The TimerInternals class that runners implement today already handles
>>>> > dynamic timers, so most of the work was in the Beam SDK  to provide
>>>> an
>>>> > API that allows users to access this feature.
>>>> >
>>>> > The main work needed in the runner was to take in account the timer
>>>> > family. Beam semantics say that if a timer is set twice with the same
>>>> > id, then the second timer overwrites the first.  Several runners
>>>> > therefore had maps from timer id -> timer. However since the
>>>> > timer family scopes the timers, we now allow two timers with the same
>>>> id
>>>> > as long as the timer families are different. Runners had to be
>>>> updated
>>>> > to include the timer family id in the map keys.
>>>> >
>>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>>> > ValidatesRunner, even though the Spark runner wasn't updated! I
>>>> wonder
>>>> > if this means that the Spark runner was incorrectly implementing the
>>>> > Beam semantics before, and setTimer was not overwriting timers with
>>>> the
>>>> > same id?
>>>> >
>>>> > Reuven
>>>> >
>>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>>> > <ma...@gmail.com>> wrote:
>>>> >
>>>> >     This looks great, thanks for the contribution Rehman!
>>>> >
>>>> >     I have some questions (note I have not looked at the code at all).
>>>> >
>>>> >     - Is this working for both portable and non portable runners?
>>>> >     - What do other runners need to implement to support this (e.g.
>>>> Spark)?
>>>> >
>>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>>> >
>>>> >     Regards,
>>>> >     Ismaël
>>>> >
>>>> >
>>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>>> >     <rehman.muradali@venturedive.com
>>>> >     <ma...@venturedive.com>> wrote:
>>>> >
>>>> >         Thank you Reuven for the guidance throughout the development
>>>> >         process. I am delighted to contribute my two cents to the Beam
>>>> >         project.
>>>> >
>>>> >         Looking forward to more active contributions.
>>>> >
>>>> >         *
>>>> >         *
>>>> >
>>>> >         *Thanks & Regards____*
>>>> >
>>>> >
>>>> >
>>>> >         *Rehman Murad Ali*
>>>> >         Software Engineer
>>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>>> <tel:+92%20345%202076766>
>>>> >         Skype: rehman.muradali
>>>> >
>>>> >
>>>> >
>>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <relax@google.com
>>>> >         <ma...@google.com>> wrote:
>>>> >
>>>> >             Thanks to a lot of hard work by Rehman, Beam now supports
>>>> >             dynamic timers. As a reminder, this was discussed on the
>>>> dev
>>>> >             list some time back.
>>>> >
>>>> >             As background, previously one had to statically declare
>>>> all
>>>> >             timers in your code. So if you wanted to have two timers,
>>>> >             you needed to create two timer variables and two
>>>> callbacks -
>>>> >             one for each timer. A number of users kept hitting
>>>> stumbling
>>>> >             blocks where they needed a dynamic set of timers (often
>>>> >             based on the element), which was not supported in Beam.
>>>> The
>>>> >             workarounds were quite ugly and complicated.
>>>> >
>>>> >             The new support allows declaring a TimerMap, which is a
>>>> map
>>>> >             of timers. Each TimerMap is scoped by a family name, so
>>>> you
>>>> >             can create multiple TimerMaps each with its own callback.
>>>> >             The use looks as follows:
>>>> >
>>>> >             class MyDoFn extends DoFn<...> {
>>>> >                 @TimerFamily("timers")
>>>> >                 private final TimerSpec timerMap =
>>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>> >
>>>> >                 @ProcessElement
>>>> >                  public void process(@TimerFamily("timers") TimerMap
>>>> >             timers, @Element Type e) {
>>>> >                     timers.set("mainTimer", timestamp);
>>>> >                     timers.set("actionType" + e.getActionType(),
>>>> timestamp);
>>>> >                 }
>>>> >
>>>> >                @OnTimerFamily .
>>>> >                public void onTimer(@TimerId String timerId) {
>>>> >                   System.out.println("Timer fired. id: " + timerId);
>>>> >                }
>>>> >             }
>>>> >
>>>> >             This currently works for the Flink and the Dataflow
>>>> runners.
>>>> >
>>>> >             Thank you Rehman for getting this done! Beam users will
>>>> find
>>>> >             it very valuable.
>>>> >
>>>> >             Reuven
>>>> >
>>>>
>>>

Re: Dynamic timers now supported!

Posted by Reuven Lax <re...@google.com>.
Thanks for finding this. Hopefully the bug is easy .to fix. The tests
indeed never ran on any runner except for the DirectRunner, which is
something I should've noticed in the code review.

Reuven

On Mon, Feb 3, 2020 at 12:50 AM Ismaël Mejía <ie...@gmail.com> wrote:

> I had a discussion with Rehman last week and we discovered that the
> TimersMap
> related tests were not running for all runners because they were not
> tagged as
> part of the ValidatesRunner category. I opened a PR [1] to enable this, so
> please someone help me with the review/merge.
>
> I took a look just for curiosity and discovered that they are only passing
> for
> Direct runner and for the classic Flink runner in batch mode. They are not
> passing for Dataflow [2][3] and for the Portable Flink runner, so probably
> worth
> to reopen the issue to investigate/fix.
>
> [1] https://github.com/apache/beam/pull/10747
> [2]
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/210/
> [3]
> https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/76/
>
>
> On Sat, Jan 25, 2020 at 1:26 AM Reuven Lax <re...@google.com> wrote:
>
>> Yes. For now we exclude the flink runner, but fixing this should be
>> fairly trivial.
>>
>> On Fri, Jan 24, 2020 at 3:35 PM Maximilian Michels <mx...@apache.org>
>> wrote:
>>
>>> The Flink Runner was allowing to set a timer multiple times before we
>>> made it comply with the Beam semantics of overwriting past invocations.
>>> I wouldn't be surprised if the Spark Runner never addressed this. Flink
>>> and Spark itself allow for a timer to be set to multiple times. In order
>>> to fix this for Beam, the Flink Runner has to maintain a checkpointed
>>> map which sits outside of its builtin TimerService.
>>>
>>> As far as I can see, multiple timer families are currently not supported
>>> in the Flink Runner due to the map not taking the family name into
>>> account. This can be easily fixed though.
>>>
>>> -Max
>>>
>>> On 24.01.20 21:31, Reuven Lax wrote:
>>> > The new timer family is in the portability protos. I think
>>> TimerReceiver
>>> > needs to be updated to set it though (I think a 1-line change).
>>> >
>>> > The TimerInternals class that runners implement today already handles
>>> > dynamic timers, so most of the work was in the Beam SDK  to provide an
>>> > API that allows users to access this feature.
>>> >
>>> > The main work needed in the runner was to take in account the timer
>>> > family. Beam semantics say that if a timer is set twice with the same
>>> > id, then the second timer overwrites the first.  Several runners
>>> > therefore had maps from timer id -> timer. However since the
>>> > timer family scopes the timers, we now allow two timers with the same
>>> id
>>> > as long as the timer families are different. Runners had to be updated
>>> > to include the timer family id in the map keys.
>>> >
>>> > Surprisingly, the new TimerMap tests seem to pass on Spark
>>> > ValidatesRunner, even though the Spark runner wasn't updated! I wonder
>>> > if this means that the Spark runner was incorrectly implementing the
>>> > Beam semantics before, and setTimer was not overwriting timers with
>>> the
>>> > same id?
>>> >
>>> > Reuven
>>> >
>>> > On Fri, Jan 24, 2020 at 7:31 AM Ismaël Mejía <iemejia@gmail.com
>>> > <ma...@gmail.com>> wrote:
>>> >
>>> >     This looks great, thanks for the contribution Rehman!
>>> >
>>> >     I have some questions (note I have not looked at the code at all).
>>> >
>>> >     - Is this working for both portable and non portable runners?
>>> >     - What do other runners need to implement to support this (e.g.
>>> Spark)?
>>> >
>>> >     Maybe worth to add this to the website Compatibility Matrix.
>>> >
>>> >     Regards,
>>> >     Ismaël
>>> >
>>> >
>>> >     On Fri, Jan 24, 2020 at 8:42 AM Rehman Murad Ali
>>> >     <rehman.muradali@venturedive.com
>>> >     <ma...@venturedive.com>> wrote:
>>> >
>>> >         Thank you Reuven for the guidance throughout the development
>>> >         process. I am delighted to contribute my two cents to the Beam
>>> >         project.
>>> >
>>> >         Looking forward to more active contributions.
>>> >
>>> >         *
>>> >         *
>>> >
>>> >         *Thanks & Regards____*
>>> >
>>> >
>>> >
>>> >         *Rehman Murad Ali*
>>> >         Software Engineer
>>> >         Mobile: +92 3452076766 <+92%20345%202076766>
>>> <tel:+92%20345%202076766>
>>> >         Skype: rehman.muradali
>>> >
>>> >
>>> >
>>> >         On Thu, Jan 23, 2020 at 11:09 PM Reuven Lax <relax@google.com
>>> >         <ma...@google.com>> wrote:
>>> >
>>> >             Thanks to a lot of hard work by Rehman, Beam now supports
>>> >             dynamic timers. As a reminder, this was discussed on the
>>> dev
>>> >             list some time back.
>>> >
>>> >             As background, previously one had to statically declare all
>>> >             timers in your code. So if you wanted to have two timers,
>>> >             you needed to create two timer variables and two callbacks
>>> -
>>> >             one for each timer. A number of users kept hitting
>>> stumbling
>>> >             blocks where they needed a dynamic set of timers (often
>>> >             based on the element), which was not supported in Beam. The
>>> >             workarounds were quite ugly and complicated.
>>> >
>>> >             The new support allows declaring a TimerMap, which is a map
>>> >             of timers. Each TimerMap is scoped by a family name, so you
>>> >             can create multiple TimerMaps each with its own callback.
>>> >             The use looks as follows:
>>> >
>>> >             class MyDoFn extends DoFn<...> {
>>> >                 @TimerFamily("timers")
>>> >                 private final TimerSpec timerMap =
>>> >             TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>> >
>>> >                 @ProcessElement
>>> >                  public void process(@TimerFamily("timers") TimerMap
>>> >             timers, @Element Type e) {
>>> >                     timers.set("mainTimer", timestamp);
>>> >                     timers.set("actionType" + e.getActionType(),
>>> timestamp);
>>> >                 }
>>> >
>>> >                @OnTimerFamily .
>>> >                public void onTimer(@TimerId String timerId) {
>>> >                   System.out.println("Timer fired. id: " + timerId);
>>> >                }
>>> >             }
>>> >
>>> >             This currently works for the Flink and the Dataflow
>>> runners.
>>> >
>>> >             Thank you Rehman for getting this done! Beam users will
>>> find
>>> >             it very valuable.
>>> >
>>> >             Reuven
>>> >
>>>
>>