You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Reuven Lax <re...@google.com> on 2019/10/21 22:23:18 UTC

Proposal: Dynamic timer support (BEAM-6857)

BEAM-6857 documents the need for dynamic timer support in the Beam API. I
wanted to make a proposal for what this API would look like, and how to
express it in the portability protos.

Background: Today Beam (especially BeamJava) requires a ParDo to statically
declare all timers it accesses at compile time. For example:

class MyDoFn extends DoFn<String, String> {
  @TimerId("timer1") TimerSpec timer1 =
TimerSpecs.timer(TimeDomain(EVENT_TIME));
  @TimerId("timer2") TimerSpec timer2 =
TimerSpecs.timer(TimeDomain(PROCESSING_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timer1") Timer
timer1, @TimerId("timer2") Timer timer2)) {
    timer1.set(...);
    timer2.set(...);
  }

  @OnTimer("timer1") public void onTimer1() { ... }
  @OnTimer("timer2") public void onTimer2() { ... }
}

This requires the author of a ParDo to know the full list of timers ahead
of time, which has been problematic in many cases. One example where it
causes issues is for DSLs such as Euphoria or Scio. DSL authors usually
write ParDos to interpret the code written in the high-level DSL, and so
don't know ahead of time the list of timers needed; alternatives today are
quite ugly: physical code generation or creating a single timer that
multiplexes all of the users logical timers. There are also cases where a
ParDo needs multiple distinct timers, but the set of distinct timers is
controlled by the input data, and therefore not knowable in advance. The
Beam timer API has been insufficient for these use cases.

I propose a new TimerMap construct, which allow a ParDo to dynamically set
named timers. It's use in the Java API would look as follows:

class MyDoFn extends DoFn<String, String> {
  @TimerId("timers") TimerSpec timers =
TimerSpecs.timerMap(TimeDomain(EVENT_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timers") TimerMap
timer)) {
    timers.set("timer1", ...);
    timers.set("timer2", ...);
  }

  @OnTimer("timer") public void onTimer(@TimerId String timerFired,
@Timestamp Instant timerTs) { ... }
}

There is a new TimerSpec type to specify a TimerMap. The TimerMap class
itself allows dynamically setting multiple timers based on a String tag
argument. Each TimerMap has a single callback which when called is given
the id of the timer that is currently firing.

It is allowed to have multiple TimerMap objects in a ParDo (and required if
you want to have both processing-time and event-time timers in the same
ParDo). Each TimerMap is its own logical namespace. i.e. if the user sets
timers with the same string tag on different TimerMap objects the timers
will not collide.

Currently the portability protos were written to mirror the Java API,
expecting one TimerSpec per timer accessed by the ParDo. I suggest that we
instead make TimerMap the default for portability, and model the current
behavior on top of timer map. If this proves problematic for some runners,
we could instead introduce a new TimerSpec proto to represent TimerMap.

Thoughts?

Reuven

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Reuven,

first of all big +1 for this.

Next, couple of questions that arise. Do you target DSLs only, or do you 
suppose that this would be used by end-users as well? If only DSLs would 
be in concern, then I think:

  a) it is not only about timers, but state has to be managed in the 
same way (although the mentioned JIRA talks about timers only now, so we 
can start with that)

  b) maybe an alternative approach (also discussed several times in 
mailing lists) would be more flexible - expose the possibility to 
provide runners directly with DoFnSignature, which would enable DSLs to 
hook "closer" to runners

I don't currently have in mind how would this "lower level" API look 
like, but concerning Euphoria, this would be a preferred solution.

Although the TimerMap looks general enough and also flexible enough, the 
general solution must include state (at least as thought experiment), 
just to make sure that we don't enable dynamic setup of timers, but 
exposing the same functionality for state would again mean we have to 
expose the complete DoFnSignature (and therefore make the TimerMap a 
somewhat redundant feature).

Jan

On 10/22/19 12:23 AM, Reuven Lax wrote:
> BEAM-6857 documents the need for dynamic timer support in the Beam 
> API. I wanted to make a proposal for what this API would look like, 
> and how to express it in the portability protos.
>
> Background: Today Beam (especially BeamJava) requires a ParDo to 
> statically declare all timers it accesses at compile time. For example:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerId("timer1") TimerSpec timer1 = 
> TimerSpecs.timer(TimeDomain(EVENT_TIME));
>   @TimerId("timer2") TimerSpec timer2 = 
> TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerId("timer1") Timer 
> timer1, @TimerId("timer2") Timer timer2)) {
>     timer1.set(...);
>     timer2.set(...);
>   }
>
>   @OnTimer("timer1") public void onTimer1() { ... }
>   @OnTimer("timer2") public void onTimer2() { ... }
> }
>
> This requires the author of a ParDo to know the full list of timers 
> ahead of time, which has been problematic in many cases. One example 
> where it causes issues is for DSLs such as Euphoria or Scio. DSL 
> authors usually write ParDos to interpret the code written in the 
> high-level DSL, and so don't know ahead of time the list of timers 
> needed; alternatives today are quite ugly: physical code generation or 
> creating a single timer that multiplexes all of the users logical 
> timers. There are also cases where a ParDo needs multiple distinct 
> timers, but the set of distinct timers is controlled by the input 
> data, and therefore not knowable in advance. The Beam timer API has 
> been insufficient for these use cases.
>
> I propose a new TimerMap construct, which allow a ParDo to dynamically 
> set named timers. It's use in the Java API would look as follows:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerId("timers") TimerSpec timers = 
> TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerId("timers") TimerMap 
> timer)) {
>     timers.set("timer1", ...);
>     timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired, 
> @Timestamp Instant timerTs) { ... }
> }
>
> There is a new TimerSpec type to specify a TimerMap. The TimerMap 
> class itself allows dynamically setting multiple timers based on a 
> String tag argument. Each TimerMap has a single callback which when 
> called is given the id of the timer that is currently firing.
>
> It is allowed to have multiple TimerMap objects in a ParDo (and 
> required if you want to have both processing-time and event-time 
> timers in the same ParDo). Each TimerMap is its own logical namespace. 
> i.e. if the user sets timers with the same string tag on different 
> TimerMap objects the timers will not collide.
>
> Currently the portability protos were written to mirror the Java API, 
> expecting one TimerSpec per timer accessed by the ParDo. I suggest 
> that we instead make TimerMap the default for portability, and model 
> the current behavior on top of timer map. If this proves problematic 
> for some runners, we could instead introduce a new TimerSpec proto to 
> represent TimerMap.
>
> Thoughts?
>
> Reuven

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Luke Cwik <lc...@google.com>.
Based upon the current description, from the portability perspective we
could:

Update the timer spec map comment[1] to be:
  // (Optional) A mapping of local timer families to timer specifications.
  map<string, TimerSpec> timer_specs = 5;

And update the timer coder to have the timer id[2]:
    // Encodes a timer containing a timestamp and a user specified payload.
    // The encoding is represented as: timestamp timer_id payload
    //   timestamp - a big endian 8 byte integer representing
millis-since-epoch.
    //     The encoded representation is shifted so that the byte
representation of
    //     negative values are lexicographically ordered before the byte
representation
    //     of positive values. This is typically done by subtracting
-9223372036854775808
    //     from the value and encoding it as a signed big endian integer.
Example values:
    //
    //     -9223372036854775808: 00 00 00 00 00 00 00 00
    //                     -255: 7F FF FF FF FF FF FF 01
    //                       -1: 7F FF FF FF FF FF FF FF
    //                        0: 80 00 00 00 00 00 00 00
    //                        1: 80 00 00 00 00 00 00 01
    //                      256: 80 00 00 00 00 00 01 00
    //      9223372036854775807: FF FF FF FF FF FF FF FF
    //   timer_id - UTF8 string encoded using beam:coder:string_utf8:v1
format
    //   payload - user defined data, uses the component coder
    // Components: Coder for the payload.
    TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"];

The rest is about plumbing this all through the SDKs and Runners.

1:
https://github.com/apache/beam/blob/79ba5458f9fd1a44c5c5778162e178dbee62bd64/model/pipeline/src/main/proto/beam_runner_api.proto#L372
2:
https://github.com/apache/beam/blob/79ba5458f9fd1a44c5c5778162e178dbee62bd64/model/pipeline/src/main/proto/beam_runner_api.proto#L595

On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Reuven,
>
> I didn't propose to restrict the model. Model can (and should have)
> multiple timers per key and even dynamic. The question was if this can be
> made efficiently by using single timer (after all, the runner will probably
> have single "timer service" so no matter what we expose on the API level,
> this will end up being multiplexed in the runner). And it might have
> additional benefits of preventing bugs. But I'm not proposing to do this
> change for existing timers, that was more a question about if we really
> must force runners to be able to implement dynamic timers or we can do it
> on the translation layer generally for all runners at once.
>
> Regarding the API - which is again independent question of how it will be
> implemented - what do we need the @TimerFamily TimerSpec declaration for? I
> see two reasons:
>
>  a) it holds the time domain
>
>  b) it declares the DoFn as being stateful
>
> Property a) looks like it can be specified when setting the timer. b)
> could be inferred from @ProcessElement (or other method). What about
> class MyDoFn extends DoFn<String, String> {
>   @ProcessElement
>   // declares @TimerContext which implies stateful DoFn
>   public void process(@Element String e, @TimerContext TimerContext timers))
> {
>     Timer timer1 = timers.get("timer1", EVENT_TIME);
>     Timer timer2 = timers.get("timer2", PROCESSING_TIME);
>     timer1.set(...);
>     timer2.set(...);
>   }
>
>   // empty name might be allowed iff the declaration contains
> @TimerContext, so that declares using dynamic timers
>   @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp
> Instant timerTs, @TimerContext TimerContext timers) { ... }
> }
>
> I'm still seeking the analogy with dynamic state, because in this API,
> that might become
>
> class MyDoFn extends DoFn<String, String> {
>   @ProcessElement
>   public void process(@Element String e, @StateContext StateContext
> states)) {
>     ValueState state = states.get("myDynamicState", StateSpec...);
>     state.get(...)
>     state.set(...)
>   }
> }
>
> The point is that there seems to be no use for any declaration like
> @TimerFamily in case of dynamic state, because there is no domain. It would
> feel weird to have to declare something for dynamic timers and not have to
> do it for state.
>
> Jan
>
> On 10/29/19 6:56 AM, Reuven Lax wrote:
>
> Just to circle back around, after the discussion on this thread I propose
> modifying the proposed API as follows:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerFamily("timers") TimerSpec timers =
> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerFamily("timers") TimerMap
> timers)) {
>     timers.set("timer1", ...);
>     timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ...
> }
> }
>
> Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
> are a bit independent (though not completely so, as it does relate), so I
> suggest splitting that into a separate discussion.
>
> Reuven
>
> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Reuven,
>>>
>>> yes, if this change is intended to be used by end users, then
>>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>>> with dynamic state - I agree that this is separate problem, but because it
>>> is close enough, we should know how we want to deal with that. Because
>>> state and timers share some functionality (after all timers need state to
>>> be fault tolerant), these API should IMO share the same logic. Whatever
>>> solution chosen to expose dynamic timers, it should extend to dynamic state.
>>>
>>> I'd like to stop a little with the premise that users want dynamic
>>> timers (that is timers whose *name* - and therefore behavior - is
>>> determined by incoming data). Could this case be modeled so that the timer
>>> actually has one more (implicit) state variable that actually holds
>>> collection of tuples (timestamp, name)? Then the timer would be invoked at
>>> given (minimum of all currently set) timestamps with respective name? The
>>> question here probably is - can this have performance impact? That is to
>>> say - can any runner actually do anything different from this in the sense
>>> of time complexity of the algorithm?
>>>
>>
>> Yes - you could always multiplex many timers one one. This is what some
>> users do today, but it tends to be very inefficient and also complex. The
>> Beam model requires runners to support dynamic timers per key (e.g. that
>> how windowing is implemented - each window has a separate timer), so
>> there's no reason not to expose this to users.
>>
>>> I'm a little afraid if actually letting users define data-driven timers
>>> might not be too restrictive for some runners. Yes, runners that don't have
>>> this option would probably be able to resort to the logic described above,
>>> but if this work could be reasonably done for all runners, then we wouldn't
>>> force runners to actually implement it. And, the API could look as if the
>>> timers were actually dynamic.
>>>
>>> Jan
>>>
>>> P.S. If dynamic (and therefore any number) of timers can be actually
>>> implemented using single timer, that might be interesting pattern, because
>>> single timer per (window, key) has many nice properties, like it implicitly
>>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
>>> seems to issue for multiple runners (samza, portable flink).
>>>
>> BEAM-7520 is simply an implementation bug. I don't think it makes sense
>> to fix a bug by restricting the model.
>>
>>
>>> On 10/22/19 6:52 PM, Reuven Lax wrote:
>>>
>>> Kenn:
>>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>>
>>> Jan:
>>> This is definitely not just for DSLs. I've definitely seen cases where
>>> the user wants different timers based on input data, so they cannot be
>>> defined statically. As a thought experiment: one stated goal of state +
>>> timers was to provide the low-level tools we use to implement windowing.
>>> However to implement windowing you need a dynamic set of timers, not just a
>>> single one. Now most users don't need to reimplement windowing (though we
>>> have had some users who had that need, when they wanted something slightly
>>> different than what native Beam windowing provided), however the need for
>>> dynamic timers is not unheard of.
>>>
>>> +1 to allowing dynamic state. However I think this is separate enough
>>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>>> which I think is a bit less of an issue for dynamic timers.
>>>
>>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>>> problem. The DSL still needs a way to set and process the timers. It also
>>> does not solve the problem where the timers are based on input data
>>> elements, so cannot be known at pipeline construction time. However what
>>> might be more important is statically defining the timer families, and a
>>> DSL could do this by specifying a DoFnSignature (and something similar
>>> could be done with state). Also as mentioned above, this is useful to
>>> normal Beam users as well, and we shouldn't force normal users to start
>>> dealing with DoFnSignatures and DoFnInvokers.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Max,
>>>>
>>>> wouldn't that be actually the same as
>>>>
>>>> class MyDoFn extends DoFn<String, String> {
>>>>
>>>>
>>>>    @ProcessElement
>>>>    public void process(
>>>>        ProcessContext context) {
>>>>      // "get" would register a new TimerSpec
>>>>      Timer timer1 = context.getTimer("timer1");
>>>>      Timer timer2 = context.getTimer("timer2");
>>>>      timers.set(...);
>>>>      timers.set(...);
>>>>    }
>>>>
>>>> That is - no need to declare anything? One more concern about that - if
>>>> we allow registration of timers (or even state) dynamically like that
>>>> it
>>>> might be harder to perform validation of pipeline upon upgrades.
>>>>
>>>> Jan
>>>>
>>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>>> > The idea makes sense to me. I really like that Beam gives upfront
>>>> > specs for timer and state, but it is not flexible enough for
>>>> > timer-based libraries or for users which want to dynamically generate
>>>> > timers.
>>>> >
>>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>>> > timer specs from setting actual timers?
>>>> >
>>>> > Suggestion:
>>>> >
>>>> > class MyDoFn extends DoFn<String, String> {
>>>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>>> >
>>>> >   @ProcessElement
>>>> >   public void process(
>>>> >       @Element String e,
>>>> >       @TimerMap TimerMap timers)) {
>>>> >     // "get" would register a new TimerSpec
>>>> >     Timer timer1 = timers.get("timer1");
>>>> >     Timer timer2 = timers.get("timer2");
>>>> >     timers.set(...);
>>>> >     timers.set(...);
>>>> >   }
>>>> >
>>>> >   // No args for "@OnTimer" => use generic TimerMap
>>>> >   @OnTimer
>>>> >   public void onTimer(
>>>> >       @TimerId String timerFired,
>>>> >       @Timestamp Instant timerTs,
>>>> >       @TimerMap TimerMap timers) {
>>>> >      // Timer firing
>>>> >      ...
>>>> >      // Set this timer (or another)
>>>> >      Timer timer = timers.get(timerFired);
>>>> >      timer.set(...);
>>>> >   }
>>>> > }
>>>> >
>>>> > What do you think?
>>>> >
>>>> > -Max
>>>> >
>>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>>> >> Hi Kenn,
>>>> >>
>>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>>> >>> This seems extremely useful.
>>>> >>>
>>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>>> >>> suggest that the parameter annotation be something other
>>>> >>> than @TimerId since that annotation is already used for a very
>>>> >>> similar but different purpose; they are close enough that it is
>>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>>> >>> keep @TimerId in the parameter list and change the declaration
>>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>>> >>> clear naming than "map".
>>>> >>>
>>>> >>> At the portability level, this API does seem to be pretty close to
>>>> a
>>>> >>> noop in terms of the messages that needs to be sent over the Fn
>>>> API,
>>>> >>> so it makes sense to loosen the protos. By the time the Fn API is
>>>> in
>>>> >>> play, all of our desires to catch errors prior to execution are
>>>> >>> irrelevant anyhow.
>>>> >>>
>>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>>> >>> than this, in that they want to programmatically adjust all the
>>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>>> >>> another. Certainly some limited DSL use cases are addressed by
>>>> this,
>>>> >>> but I wouldn't take that as a primary use case for this feature.
>>>> >>> Ultimately they are probably better served by being able to
>>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>>> >>> That would allow maximum flexibility in utilizing the portability
>>>> >>> framework.
>>>> >>
>>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>>> >> that DSLs are not bound to portability - we have to be able to
>>>> >> translate even in case of "legacy" runners as well. That might
>>>> >> complicate things a bit maybe.
>>>> >>
>>>> >> Jan
>>>> >>
>>>> >>>
>>>> >>> Kenn
>>>> >>>
>>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
>>>> >>> <ma...@google.com>> wrote:
>>>> >>>
>>>> >>>     BEAM-6857 documents the need for dynamic timer support in the
>>>> Beam
>>>> >>>     API. I wanted to make a proposal for what this API would look
>>>> >>>     like, and how to express it in the portability protos.
>>>> >>>
>>>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>>>> >>>     statically declare all timers it accesses at compile time. For
>>>> >>>     example:
>>>> >>>
>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>> >>>       @TimerId("timer1") TimerSpec timer1 =
>>>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>>> >>>       @TimerId("timer2") TimerSpec timer2 =
>>>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>>> >>>
>>>> >>>       @ProcessElement
>>>> >>>       public void process(@Element String e, @TimerId("timer1")
>>>> Timer
>>>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>>> >>>         timer1.set(...);
>>>> >>>         timer2.set(...);
>>>> >>>       }
>>>> >>>
>>>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>>> >>>     }
>>>> >>>
>>>> >>>     This requires the author of a ParDo to know the full list of
>>>> >>>     timers ahead of time, which has been problematic in many cases.
>>>> >>>     One example where it causes issues is for DSLs such as Euphoria
>>>> or
>>>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>>>> >>>     written in the high-level DSL, and so don't know ahead of time
>>>> the
>>>> >>>     list of timers needed; alternatives today are quite ugly:
>>>> physical
>>>> >>>     code generation or creating a single timer that multiplexes all
>>>> of
>>>> >>>     the users logical timers. There are also cases where a ParDo
>>>> needs
>>>> >>>     multiple distinct timers, but the set of distinct timers is
>>>> >>>     controlled by the input data, and therefore not knowable in
>>>> >>>     advance. The Beam timer API has been insufficient for these use
>>>> >>> cases.
>>>> >>>
>>>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>>>> >>>     dynamically set named timers. It's use in the Java API would
>>>> look
>>>> >>>     as follows:
>>>> >>>
>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>> >>>       @TimerId("timers") TimerSpec timers =
>>>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>>> >>>
>>>> >>>       @ProcessElement
>>>> >>>       public void process(@Element String e, @TimerId("timers")
>>>> >>>     TimerMap timer)) {
>>>> >>>         timers.set("timer1", ...);
>>>> >>>         timers.set("timer2", ...);
>>>> >>>       }
>>>> >>>
>>>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>>>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>>> >>>     }
>>>> >>>
>>>> >>>     There is a new TimerSpec type to specify a TimerMap. The
>>>> TimerMap
>>>> >>>     class itself allows dynamically setting multiple timers based
>>>> on a
>>>> >>>     String tag argument. Each TimerMap has a single callback which
>>>> >>>     when called is given the id of the timer that is currently
>>>> firing.
>>>> >>>
>>>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>>> >>>     required if you want to have both processing-time and event-time
>>>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>>>> >>>     namespace. i.e. if the user sets timers with the same string tag
>>>> >>>     on different TimerMap objects the timers will not collide.
>>>> >>>
>>>> >>>     Currently the portability protos were written to mirror the Java
>>>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>>> >>>     suggest that we instead make TimerMap the default for
>>>> portability,
>>>> >>>     and model the current behavior on top of timer map. If this
>>>> proves
>>>> >>>     problematic for some runners, we could instead introduce a new
>>>> >>>     TimerSpec proto to represent TimerMap.
>>>> >>>
>>>> >>>     Thoughts?
>>>> >>>
>>>> >>>     Reuven
>>>> >>>
>>>>
>>>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Reuven Lax <re...@google.com>.
Everyone on this thread seems to agree on the general idea, and remaining
discussions seem to be about details pertaining to the interface. Those are
easier to discuss on a PR, so I suggest we move ahead with a PR, and if
people want to they are welcome to comment on the PR.

Reuven

On Fri, Nov 1, 2019 at 11:07 AM Reuven Lax <re...@google.com> wrote:

> Hi Jan,
>
> Your proposal has merit, but I think using the TimerFamily specification
> is more consistent with the existing API. I think that a StateFamily can
> also have domains just like timers.
>
> Luke's suggestion for the proto changes sound good.
>
> Reuven
>
> On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Reuven,
>>
>> I didn't propose to restrict the model. Model can (and should have)
>> multiple timers per key and even dynamic. The question was if this can be
>> made efficiently by using single timer (after all, the runner will probably
>> have single "timer service" so no matter what we expose on the API level,
>> this will end up being multiplexed in the runner). And it might have
>> additional benefits of preventing bugs. But I'm not proposing to do this
>> change for existing timers, that was more a question about if we really
>> must force runners to be able to implement dynamic timers or we can do it
>> on the translation layer generally for all runners at once.
>>
>> Regarding the API - which is again independent question of how it will be
>> implemented - what do we need the @TimerFamily TimerSpec declaration for? I
>> see two reasons:
>>
>>  a) it holds the time domain
>>
>>  b) it declares the DoFn as being stateful
>>
>> Property a) looks like it can be specified when setting the timer. b)
>> could be inferred from @ProcessElement (or other method). What about
>> class MyDoFn extends DoFn<String, String> {
>>   @ProcessElement
>>   // declares @TimerContext which implies stateful DoFn
>>   public void process(@Element String e, @TimerContext TimerContext
>> timers)) {
>>     Timer timer1 = timers.get("timer1", EVENT_TIME);
>>     Timer timer2 = timers.get("timer2", PROCESSING_TIME);
>>     timer1.set(...);
>>     timer2.set(...);
>>   }
>>
>>   // empty name might be allowed iff the declaration contains
>> @TimerContext, so that declares using dynamic timers
>>   @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp
>> Instant timerTs, @TimerContext TimerContext timers) { ... }
>> }
>>
>> I'm still seeking the analogy with dynamic state, because in this API,
>> that might become
>>
>> class MyDoFn extends DoFn<String, String> {
>>   @ProcessElement
>>   public void process(@Element String e, @StateContext StateContext
>> states)) {
>>     ValueState state = states.get("myDynamicState", StateSpec...);
>>     state.get(...)
>>     state.set(...)
>>   }
>> }
>>
>> The point is that there seems to be no use for any declaration like
>> @TimerFamily in case of dynamic state, because there is no domain. It would
>> feel weird to have to declare something for dynamic timers and not have to
>> do it for state.
>>
>> Jan
>>
>> On 10/29/19 6:56 AM, Reuven Lax wrote:
>>
>> Just to circle back around, after the discussion on this thread I propose
>> modifying the proposed API as follows:
>>
>> class MyDoFn extends DoFn<String, String> {
>>   @TimerFamily("timers") TimerSpec timers =
>> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));
>>
>>   @ProcessElement
>>   public void process(@Element String e, @TimerFamily("timers") TimerMap
>> timers)) {
>>     timers.set("timer1", ...);
>>     timers.set("timer2", ...);
>>   }
>>
>>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
>> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) {
>> ... }
>> }
>>
>> Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
>> are a bit independent (though not completely so, as it does relate), so I
>> suggest splitting that into a separate discussion.
>>
>> Reuven
>>
>> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> yes, if this change is intended to be used by end users, then
>>>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>>>> with dynamic state - I agree that this is separate problem, but because it
>>>> is close enough, we should know how we want to deal with that. Because
>>>> state and timers share some functionality (after all timers need state to
>>>> be fault tolerant), these API should IMO share the same logic. Whatever
>>>> solution chosen to expose dynamic timers, it should extend to dynamic state.
>>>>
>>>> I'd like to stop a little with the premise that users want dynamic
>>>> timers (that is timers whose *name* - and therefore behavior - is
>>>> determined by incoming data). Could this case be modeled so that the timer
>>>> actually has one more (implicit) state variable that actually holds
>>>> collection of tuples (timestamp, name)? Then the timer would be invoked at
>>>> given (minimum of all currently set) timestamps with respective name? The
>>>> question here probably is - can this have performance impact? That is to
>>>> say - can any runner actually do anything different from this in the sense
>>>> of time complexity of the algorithm?
>>>>
>>>
>>> Yes - you could always multiplex many timers one one. This is what some
>>> users do today, but it tends to be very inefficient and also complex. The
>>> Beam model requires runners to support dynamic timers per key (e.g. that
>>> how windowing is implemented - each window has a separate timer), so
>>> there's no reason not to expose this to users.
>>>
>>>> I'm a little afraid if actually letting users define data-driven timers
>>>> might not be too restrictive for some runners. Yes, runners that don't have
>>>> this option would probably be able to resort to the logic described above,
>>>> but if this work could be reasonably done for all runners, then we wouldn't
>>>> force runners to actually implement it. And, the API could look as if the
>>>> timers were actually dynamic.
>>>>
>>>> Jan
>>>>
>>>> P.S. If dynamic (and therefore any number) of timers can be actually
>>>> implemented using single timer, that might be interesting pattern, because
>>>> single timer per (window, key) has many nice properties, like it implicitly
>>>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
>>>> seems to issue for multiple runners (samza, portable flink).
>>>>
>>> BEAM-7520 is simply an implementation bug. I don't think it makes sense
>>> to fix a bug by restricting the model.
>>>
>>>
>>>> On 10/22/19 6:52 PM, Reuven Lax wrote:
>>>>
>>>> Kenn:
>>>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>>>
>>>> Jan:
>>>> This is definitely not just for DSLs. I've definitely seen cases where
>>>> the user wants different timers based on input data, so they cannot be
>>>> defined statically. As a thought experiment: one stated goal of state +
>>>> timers was to provide the low-level tools we use to implement windowing.
>>>> However to implement windowing you need a dynamic set of timers, not just a
>>>> single one. Now most users don't need to reimplement windowing (though we
>>>> have had some users who had that need, when they wanted something slightly
>>>> different than what native Beam windowing provided), however the need for
>>>> dynamic timers is not unheard of.
>>>>
>>>> +1 to allowing dynamic state. However I think this is separate enough
>>>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>>>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>>>> which I think is a bit less of an issue for dynamic timers.
>>>>
>>>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>>>> problem. The DSL still needs a way to set and process the timers. It also
>>>> does not solve the problem where the timers are based on input data
>>>> elements, so cannot be known at pipeline construction time. However what
>>>> might be more important is statically defining the timer families, and a
>>>> DSL could do this by specifying a DoFnSignature (and something similar
>>>> could be done with state). Also as mentioned above, this is useful to
>>>> normal Beam users as well, and we shouldn't force normal users to start
>>>> dealing with DoFnSignatures and DoFnInvokers.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi Max,
>>>>>
>>>>> wouldn't that be actually the same as
>>>>>
>>>>> class MyDoFn extends DoFn<String, String> {
>>>>>
>>>>>
>>>>>    @ProcessElement
>>>>>    public void process(
>>>>>        ProcessContext context) {
>>>>>      // "get" would register a new TimerSpec
>>>>>      Timer timer1 = context.getTimer("timer1");
>>>>>      Timer timer2 = context.getTimer("timer2");
>>>>>      timers.set(...);
>>>>>      timers.set(...);
>>>>>    }
>>>>>
>>>>> That is - no need to declare anything? One more concern about that -
>>>>> if
>>>>> we allow registration of timers (or even state) dynamically like that
>>>>> it
>>>>> might be harder to perform validation of pipeline upon upgrades.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>>>> > The idea makes sense to me. I really like that Beam gives upfront
>>>>> > specs for timer and state, but it is not flexible enough for
>>>>> > timer-based libraries or for users which want to dynamically
>>>>> generate
>>>>> > timers.
>>>>> >
>>>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>>>> > timer specs from setting actual timers?
>>>>> >
>>>>> > Suggestion:
>>>>> >
>>>>> > class MyDoFn extends DoFn<String, String> {
>>>>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>>>> >
>>>>> >   @ProcessElement
>>>>> >   public void process(
>>>>> >       @Element String e,
>>>>> >       @TimerMap TimerMap timers)) {
>>>>> >     // "get" would register a new TimerSpec
>>>>> >     Timer timer1 = timers.get("timer1");
>>>>> >     Timer timer2 = timers.get("timer2");
>>>>> >     timers.set(...);
>>>>> >     timers.set(...);
>>>>> >   }
>>>>> >
>>>>> >   // No args for "@OnTimer" => use generic TimerMap
>>>>> >   @OnTimer
>>>>> >   public void onTimer(
>>>>> >       @TimerId String timerFired,
>>>>> >       @Timestamp Instant timerTs,
>>>>> >       @TimerMap TimerMap timers) {
>>>>> >      // Timer firing
>>>>> >      ...
>>>>> >      // Set this timer (or another)
>>>>> >      Timer timer = timers.get(timerFired);
>>>>> >      timer.set(...);
>>>>> >   }
>>>>> > }
>>>>> >
>>>>> > What do you think?
>>>>> >
>>>>> > -Max
>>>>> >
>>>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>>>> >> Hi Kenn,
>>>>> >>
>>>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>>>> >>> This seems extremely useful.
>>>>> >>>
>>>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>>>> >>> suggest that the parameter annotation be something other
>>>>> >>> than @TimerId since that annotation is already used for a very
>>>>> >>> similar but different purpose; they are close enough that it is
>>>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>>>> >>> keep @TimerId in the parameter list and change the declaration
>>>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>>>> >>> clear naming than "map".
>>>>> >>>
>>>>> >>> At the portability level, this API does seem to be pretty close to
>>>>> a
>>>>> >>> noop in terms of the messages that needs to be sent over the Fn
>>>>> API,
>>>>> >>> so it makes sense to loosen the protos. By the time the Fn API is
>>>>> in
>>>>> >>> play, all of our desires to catch errors prior to execution are
>>>>> >>> irrelevant anyhow.
>>>>> >>>
>>>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>>>> >>> than this, in that they want to programmatically adjust all the
>>>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>>>> >>> another. Certainly some limited DSL use cases are addressed by
>>>>> this,
>>>>> >>> but I wouldn't take that as a primary use case for this feature.
>>>>> >>> Ultimately they are probably better served by being able to
>>>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>>>> >>> That would allow maximum flexibility in utilizing the portability
>>>>> >>> framework.
>>>>> >>
>>>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>>>> >> that DSLs are not bound to portability - we have to be able to
>>>>> >> translate even in case of "legacy" runners as well. That might
>>>>> >> complicate things a bit maybe.
>>>>> >>
>>>>> >> Jan
>>>>> >>
>>>>> >>>
>>>>> >>> Kenn
>>>>> >>>
>>>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
>>>>> >>> <ma...@google.com>> wrote:
>>>>> >>>
>>>>> >>>     BEAM-6857 documents the need for dynamic timer support in the
>>>>> Beam
>>>>> >>>     API. I wanted to make a proposal for what this API would look
>>>>> >>>     like, and how to express it in the portability protos.
>>>>> >>>
>>>>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo
>>>>> to
>>>>> >>>     statically declare all timers it accesses at compile time. For
>>>>> >>>     example:
>>>>> >>>
>>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>>> >>>       @TimerId("timer1") TimerSpec timer1 =
>>>>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>>>> >>>       @TimerId("timer2") TimerSpec timer2 =
>>>>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>>>> >>>
>>>>> >>>       @ProcessElement
>>>>> >>>       public void process(@Element String e, @TimerId("timer1")
>>>>> Timer
>>>>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>>>> >>>         timer1.set(...);
>>>>> >>>         timer2.set(...);
>>>>> >>>       }
>>>>> >>>
>>>>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>>>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>>>> >>>     }
>>>>> >>>
>>>>> >>>     This requires the author of a ParDo to know the full list of
>>>>> >>>     timers ahead of time, which has been problematic in many cases.
>>>>> >>>     One example where it causes issues is for DSLs such as
>>>>> Euphoria or
>>>>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>>>>> >>>     written in the high-level DSL, and so don't know ahead of time
>>>>> the
>>>>> >>>     list of timers needed; alternatives today are quite ugly:
>>>>> physical
>>>>> >>>     code generation or creating a single timer that multiplexes
>>>>> all of
>>>>> >>>     the users logical timers. There are also cases where a ParDo
>>>>> needs
>>>>> >>>     multiple distinct timers, but the set of distinct timers is
>>>>> >>>     controlled by the input data, and therefore not knowable in
>>>>> >>>     advance. The Beam timer API has been insufficient for these
>>>>> use
>>>>> >>> cases.
>>>>> >>>
>>>>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>>>>> >>>     dynamically set named timers. It's use in the Java API would
>>>>> look
>>>>> >>>     as follows:
>>>>> >>>
>>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>>> >>>       @TimerId("timers") TimerSpec timers =
>>>>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>>>> >>>
>>>>> >>>       @ProcessElement
>>>>> >>>       public void process(@Element String e, @TimerId("timers")
>>>>> >>>     TimerMap timer)) {
>>>>> >>>         timers.set("timer1", ...);
>>>>> >>>         timers.set("timer2", ...);
>>>>> >>>       }
>>>>> >>>
>>>>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>>>>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>>>> >>>     }
>>>>> >>>
>>>>> >>>     There is a new TimerSpec type to specify a TimerMap. The
>>>>> TimerMap
>>>>> >>>     class itself allows dynamically setting multiple timers based
>>>>> on a
>>>>> >>>     String tag argument. Each TimerMap has a single callback which
>>>>> >>>     when called is given the id of the timer that is currently
>>>>> firing.
>>>>> >>>
>>>>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>>>> >>>     required if you want to have both processing-time and
>>>>> event-time
>>>>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>>>>> >>>     namespace. i.e. if the user sets timers with the same string
>>>>> tag
>>>>> >>>     on different TimerMap objects the timers will not collide.
>>>>> >>>
>>>>> >>>     Currently the portability protos were written to mirror the
>>>>> Java
>>>>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>>>> >>>     suggest that we instead make TimerMap the default for
>>>>> portability,
>>>>> >>>     and model the current behavior on top of timer map. If this
>>>>> proves
>>>>> >>>     problematic for some runners, we could instead introduce a new
>>>>> >>>     TimerSpec proto to represent TimerMap.
>>>>> >>>
>>>>> >>>     Thoughts?
>>>>> >>>
>>>>> >>>     Reuven
>>>>> >>>
>>>>>
>>>>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Reuven Lax <re...@google.com>.
Hi Jan,

Your proposal has merit, but I think using the TimerFamily specification is
more consistent with the existing API. I think that a StateFamily can also
have domains just like timers.

Luke's suggestion for the proto changes sound good.

Reuven

On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Reuven,
>
> I didn't propose to restrict the model. Model can (and should have)
> multiple timers per key and even dynamic. The question was if this can be
> made efficiently by using single timer (after all, the runner will probably
> have single "timer service" so no matter what we expose on the API level,
> this will end up being multiplexed in the runner). And it might have
> additional benefits of preventing bugs. But I'm not proposing to do this
> change for existing timers, that was more a question about if we really
> must force runners to be able to implement dynamic timers or we can do it
> on the translation layer generally for all runners at once.
>
> Regarding the API - which is again independent question of how it will be
> implemented - what do we need the @TimerFamily TimerSpec declaration for? I
> see two reasons:
>
>  a) it holds the time domain
>
>  b) it declares the DoFn as being stateful
>
> Property a) looks like it can be specified when setting the timer. b)
> could be inferred from @ProcessElement (or other method). What about
> class MyDoFn extends DoFn<String, String> {
>   @ProcessElement
>   // declares @TimerContext which implies stateful DoFn
>   public void process(@Element String e, @TimerContext TimerContext timers))
> {
>     Timer timer1 = timers.get("timer1", EVENT_TIME);
>     Timer timer2 = timers.get("timer2", PROCESSING_TIME);
>     timer1.set(...);
>     timer2.set(...);
>   }
>
>   // empty name might be allowed iff the declaration contains
> @TimerContext, so that declares using dynamic timers
>   @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp
> Instant timerTs, @TimerContext TimerContext timers) { ... }
> }
>
> I'm still seeking the analogy with dynamic state, because in this API,
> that might become
>
> class MyDoFn extends DoFn<String, String> {
>   @ProcessElement
>   public void process(@Element String e, @StateContext StateContext
> states)) {
>     ValueState state = states.get("myDynamicState", StateSpec...);
>     state.get(...)
>     state.set(...)
>   }
> }
>
> The point is that there seems to be no use for any declaration like
> @TimerFamily in case of dynamic state, because there is no domain. It would
> feel weird to have to declare something for dynamic timers and not have to
> do it for state.
>
> Jan
>
> On 10/29/19 6:56 AM, Reuven Lax wrote:
>
> Just to circle back around, after the discussion on this thread I propose
> modifying the proposed API as follows:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerFamily("timers") TimerSpec timers =
> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerFamily("timers") TimerMap
> timers)) {
>     timers.set("timer1", ...);
>     timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ...
> }
> }
>
> Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
> are a bit independent (though not completely so, as it does relate), so I
> suggest splitting that into a separate discussion.
>
> Reuven
>
> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Reuven,
>>>
>>> yes, if this change is intended to be used by end users, then
>>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>>> with dynamic state - I agree that this is separate problem, but because it
>>> is close enough, we should know how we want to deal with that. Because
>>> state and timers share some functionality (after all timers need state to
>>> be fault tolerant), these API should IMO share the same logic. Whatever
>>> solution chosen to expose dynamic timers, it should extend to dynamic state.
>>>
>>> I'd like to stop a little with the premise that users want dynamic
>>> timers (that is timers whose *name* - and therefore behavior - is
>>> determined by incoming data). Could this case be modeled so that the timer
>>> actually has one more (implicit) state variable that actually holds
>>> collection of tuples (timestamp, name)? Then the timer would be invoked at
>>> given (minimum of all currently set) timestamps with respective name? The
>>> question here probably is - can this have performance impact? That is to
>>> say - can any runner actually do anything different from this in the sense
>>> of time complexity of the algorithm?
>>>
>>
>> Yes - you could always multiplex many timers one one. This is what some
>> users do today, but it tends to be very inefficient and also complex. The
>> Beam model requires runners to support dynamic timers per key (e.g. that
>> how windowing is implemented - each window has a separate timer), so
>> there's no reason not to expose this to users.
>>
>>> I'm a little afraid if actually letting users define data-driven timers
>>> might not be too restrictive for some runners. Yes, runners that don't have
>>> this option would probably be able to resort to the logic described above,
>>> but if this work could be reasonably done for all runners, then we wouldn't
>>> force runners to actually implement it. And, the API could look as if the
>>> timers were actually dynamic.
>>>
>>> Jan
>>>
>>> P.S. If dynamic (and therefore any number) of timers can be actually
>>> implemented using single timer, that might be interesting pattern, because
>>> single timer per (window, key) has many nice properties, like it implicitly
>>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
>>> seems to issue for multiple runners (samza, portable flink).
>>>
>> BEAM-7520 is simply an implementation bug. I don't think it makes sense
>> to fix a bug by restricting the model.
>>
>>
>>> On 10/22/19 6:52 PM, Reuven Lax wrote:
>>>
>>> Kenn:
>>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>>
>>> Jan:
>>> This is definitely not just for DSLs. I've definitely seen cases where
>>> the user wants different timers based on input data, so they cannot be
>>> defined statically. As a thought experiment: one stated goal of state +
>>> timers was to provide the low-level tools we use to implement windowing.
>>> However to implement windowing you need a dynamic set of timers, not just a
>>> single one. Now most users don't need to reimplement windowing (though we
>>> have had some users who had that need, when they wanted something slightly
>>> different than what native Beam windowing provided), however the need for
>>> dynamic timers is not unheard of.
>>>
>>> +1 to allowing dynamic state. However I think this is separate enough
>>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>>> which I think is a bit less of an issue for dynamic timers.
>>>
>>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>>> problem. The DSL still needs a way to set and process the timers. It also
>>> does not solve the problem where the timers are based on input data
>>> elements, so cannot be known at pipeline construction time. However what
>>> might be more important is statically defining the timer families, and a
>>> DSL could do this by specifying a DoFnSignature (and something similar
>>> could be done with state). Also as mentioned above, this is useful to
>>> normal Beam users as well, and we shouldn't force normal users to start
>>> dealing with DoFnSignatures and DoFnInvokers.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi Max,
>>>>
>>>> wouldn't that be actually the same as
>>>>
>>>> class MyDoFn extends DoFn<String, String> {
>>>>
>>>>
>>>>    @ProcessElement
>>>>    public void process(
>>>>        ProcessContext context) {
>>>>      // "get" would register a new TimerSpec
>>>>      Timer timer1 = context.getTimer("timer1");
>>>>      Timer timer2 = context.getTimer("timer2");
>>>>      timers.set(...);
>>>>      timers.set(...);
>>>>    }
>>>>
>>>> That is - no need to declare anything? One more concern about that - if
>>>> we allow registration of timers (or even state) dynamically like that
>>>> it
>>>> might be harder to perform validation of pipeline upon upgrades.
>>>>
>>>> Jan
>>>>
>>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>>> > The idea makes sense to me. I really like that Beam gives upfront
>>>> > specs for timer and state, but it is not flexible enough for
>>>> > timer-based libraries or for users which want to dynamically generate
>>>> > timers.
>>>> >
>>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>>> > timer specs from setting actual timers?
>>>> >
>>>> > Suggestion:
>>>> >
>>>> > class MyDoFn extends DoFn<String, String> {
>>>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>>> >
>>>> >   @ProcessElement
>>>> >   public void process(
>>>> >       @Element String e,
>>>> >       @TimerMap TimerMap timers)) {
>>>> >     // "get" would register a new TimerSpec
>>>> >     Timer timer1 = timers.get("timer1");
>>>> >     Timer timer2 = timers.get("timer2");
>>>> >     timers.set(...);
>>>> >     timers.set(...);
>>>> >   }
>>>> >
>>>> >   // No args for "@OnTimer" => use generic TimerMap
>>>> >   @OnTimer
>>>> >   public void onTimer(
>>>> >       @TimerId String timerFired,
>>>> >       @Timestamp Instant timerTs,
>>>> >       @TimerMap TimerMap timers) {
>>>> >      // Timer firing
>>>> >      ...
>>>> >      // Set this timer (or another)
>>>> >      Timer timer = timers.get(timerFired);
>>>> >      timer.set(...);
>>>> >   }
>>>> > }
>>>> >
>>>> > What do you think?
>>>> >
>>>> > -Max
>>>> >
>>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>>> >> Hi Kenn,
>>>> >>
>>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>>> >>> This seems extremely useful.
>>>> >>>
>>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>>> >>> suggest that the parameter annotation be something other
>>>> >>> than @TimerId since that annotation is already used for a very
>>>> >>> similar but different purpose; they are close enough that it is
>>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>>> >>> keep @TimerId in the parameter list and change the declaration
>>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>>> >>> clear naming than "map".
>>>> >>>
>>>> >>> At the portability level, this API does seem to be pretty close to
>>>> a
>>>> >>> noop in terms of the messages that needs to be sent over the Fn
>>>> API,
>>>> >>> so it makes sense to loosen the protos. By the time the Fn API is
>>>> in
>>>> >>> play, all of our desires to catch errors prior to execution are
>>>> >>> irrelevant anyhow.
>>>> >>>
>>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>>> >>> than this, in that they want to programmatically adjust all the
>>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>>> >>> another. Certainly some limited DSL use cases are addressed by
>>>> this,
>>>> >>> but I wouldn't take that as a primary use case for this feature.
>>>> >>> Ultimately they are probably better served by being able to
>>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>>> >>> That would allow maximum flexibility in utilizing the portability
>>>> >>> framework.
>>>> >>
>>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>>> >> that DSLs are not bound to portability - we have to be able to
>>>> >> translate even in case of "legacy" runners as well. That might
>>>> >> complicate things a bit maybe.
>>>> >>
>>>> >> Jan
>>>> >>
>>>> >>>
>>>> >>> Kenn
>>>> >>>
>>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
>>>> >>> <ma...@google.com>> wrote:
>>>> >>>
>>>> >>>     BEAM-6857 documents the need for dynamic timer support in the
>>>> Beam
>>>> >>>     API. I wanted to make a proposal for what this API would look
>>>> >>>     like, and how to express it in the portability protos.
>>>> >>>
>>>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>>>> >>>     statically declare all timers it accesses at compile time. For
>>>> >>>     example:
>>>> >>>
>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>> >>>       @TimerId("timer1") TimerSpec timer1 =
>>>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>>> >>>       @TimerId("timer2") TimerSpec timer2 =
>>>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>>> >>>
>>>> >>>       @ProcessElement
>>>> >>>       public void process(@Element String e, @TimerId("timer1")
>>>> Timer
>>>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>>> >>>         timer1.set(...);
>>>> >>>         timer2.set(...);
>>>> >>>       }
>>>> >>>
>>>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>>> >>>     }
>>>> >>>
>>>> >>>     This requires the author of a ParDo to know the full list of
>>>> >>>     timers ahead of time, which has been problematic in many cases.
>>>> >>>     One example where it causes issues is for DSLs such as Euphoria
>>>> or
>>>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>>>> >>>     written in the high-level DSL, and so don't know ahead of time
>>>> the
>>>> >>>     list of timers needed; alternatives today are quite ugly:
>>>> physical
>>>> >>>     code generation or creating a single timer that multiplexes all
>>>> of
>>>> >>>     the users logical timers. There are also cases where a ParDo
>>>> needs
>>>> >>>     multiple distinct timers, but the set of distinct timers is
>>>> >>>     controlled by the input data, and therefore not knowable in
>>>> >>>     advance. The Beam timer API has been insufficient for these use
>>>> >>> cases.
>>>> >>>
>>>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>>>> >>>     dynamically set named timers. It's use in the Java API would
>>>> look
>>>> >>>     as follows:
>>>> >>>
>>>> >>>     class MyDoFn extends DoFn<String, String> {
>>>> >>>       @TimerId("timers") TimerSpec timers =
>>>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>>> >>>
>>>> >>>       @ProcessElement
>>>> >>>       public void process(@Element String e, @TimerId("timers")
>>>> >>>     TimerMap timer)) {
>>>> >>>         timers.set("timer1", ...);
>>>> >>>         timers.set("timer2", ...);
>>>> >>>       }
>>>> >>>
>>>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>>>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>>> >>>     }
>>>> >>>
>>>> >>>     There is a new TimerSpec type to specify a TimerMap. The
>>>> TimerMap
>>>> >>>     class itself allows dynamically setting multiple timers based
>>>> on a
>>>> >>>     String tag argument. Each TimerMap has a single callback which
>>>> >>>     when called is given the id of the timer that is currently
>>>> firing.
>>>> >>>
>>>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>>> >>>     required if you want to have both processing-time and event-time
>>>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>>>> >>>     namespace. i.e. if the user sets timers with the same string tag
>>>> >>>     on different TimerMap objects the timers will not collide.
>>>> >>>
>>>> >>>     Currently the portability protos were written to mirror the Java
>>>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>>> >>>     suggest that we instead make TimerMap the default for
>>>> portability,
>>>> >>>     and model the current behavior on top of timer map. If this
>>>> proves
>>>> >>>     problematic for some runners, we could instead introduce a new
>>>> >>>     TimerSpec proto to represent TimerMap.
>>>> >>>
>>>> >>>     Thoughts?
>>>> >>>
>>>> >>>     Reuven
>>>> >>>
>>>>
>>>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Reuven,

I didn't propose to restrict the model. Model can (and should have) 
multiple timers per key and even dynamic. The question was if this can 
be made efficiently by using single timer (after all, the runner will 
probably have single "timer service" so no matter what we expose on the 
API level, this will end up being multiplexed in the runner). And it 
might have additional benefits of preventing bugs. But I'm not proposing 
to do this change for existing timers, that was more a question about if 
we really must force runners to be able to implement dynamic timers or 
we can do it on the translation layer generally for all runners at once.

Regarding the API - which is again independent question of how it will 
be implemented - what do we need the @TimerFamily TimerSpec declaration 
for? I see two reasons:

  a) it holds the time domain

  b) it declares the DoFn as being stateful

Property a) looks like it can be specified when setting the timer. b) 
could be inferred from @ProcessElement (or other method). What about

class MyDoFn extends DoFn<String, String> {
   @ProcessElement
   // declares @TimerContext which implies stateful DoFn
   public void process(@Element String e, @TimerContext TimerContext 
timers)) {
     Timer timer1 = timers.get("timer1", EVENT_TIME);
     Timer timer2 = timers.get("timer2", PROCESSING_TIME);
timer1.set(...);
timer2.set(...);
   }

   // empty name might be allowed iff the declaration contains 
@TimerContext, so that declares using dynamic timers
   @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp 
Instant timerTs, @TimerContext TimerContext timers) { ... }
}

I'm still seeking the analogy with dynamic state, because in this API, 
that might become

class MyDoFn extends DoFn<String, String> {
   @ProcessElement
   public void process(@Element String e, @StateContext StateContext 
states)) {
     ValueState state = states.get("myDynamicState", StateSpec...);
     state.get(...)
     state.set(...)
   }
}

The point is that there seems to be no use for any declaration like 
@TimerFamily in case of dynamic state, because there is no domain. It 
would feel weird to have to declare something for dynamic timers and not 
have to do it for state.

Jan

On 10/29/19 6:56 AM, Reuven Lax wrote:
> Just to circle back around, after the discussion on this thread I 
> propose modifying the proposed API as follows:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerFamily("timers") TimerSpec timers = 
> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerFamily("timers") 
> TimerMap timers)) {
> timers.set("timer1", ...);
> timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired, 
> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { 
> ... }
> }
>
> Discussions around exposing DoFnSignature and DoFnInvoker to DSL 
> authors are a bit independent (though not completely so, as it does 
> relate), so I suggest splitting that into a separate discussion.
>
> Reuven
>
> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <relax@google.com 
> <ma...@google.com>> wrote:
>
>
>
>     On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je.ik@seznam.cz
>     <ma...@seznam.cz>> wrote:
>
>         Hi Reuven,
>
>         yes, if this change is intended to be used by end users, then
>         DoFnSignatures cannot be used, agree on that. Regarding the
>         relationship with dynamic state - I agree that this is
>         separate problem, but because it is close enough, we should
>         know how we want to deal with that. Because state and timers
>         share some functionality (after all timers need state to be
>         fault tolerant), these API should IMO share the same logic.
>         Whatever solution chosen to expose dynamic timers, it should
>         extend to dynamic state.
>
>         I'd like to stop a little with the premise that users want
>         dynamic timers (that is timers whose *name* - and therefore
>         behavior - is determined by incoming data). Could this case be
>         modeled so that the timer actually has one more (implicit)
>         state variable that actually holds collection of tuples
>         (timestamp, name)? Then the timer would be invoked at given
>         (minimum of all currently set) timestamps with respective
>         name? The question here probably is - can this have
>         performance impact? That is to say - can any runner actually
>         do anything different from this in the sense of time
>         complexity of the algorithm?
>
>
>     Yes - you could always multiplex many timers one one. This is what
>     some users do today, but it tends to be very inefficient and also
>     complex. The Beam model requires runners to support dynamic timers
>     per key (e.g. that how windowing is implemented - each window has
>     a separate timer), so there's no reason not to expose this to users.
>
>         I'm a little afraid if actually letting users define
>         data-driven timers might not be too restrictive for some
>         runners. Yes, runners that don't have this option would
>         probably be able to resort to the logic described above, but
>         if this work could be reasonably done for all runners, then we
>         wouldn't force runners to actually implement it. And, the API
>         could look as if the timers were actually dynamic.
>
>         Jan
>
>         P.S. If dynamic (and therefore any number) of timers can be
>         actually implemented using single timer, that might be
>         interesting pattern, because single timer per (window, key)
>         has many nice properties, like it implicitly avoids situation
>         where timer invocation is not ordered ([BEAM-7520]), which
>         seems to issue for multiple runners (samza, portable flink).
>
>     BEAM-7520 is simply an implementation bug. I don't think it makes
>     sense to fix a bug by restricting the model.
>
>         On 10/22/19 6:52 PM, Reuven Lax wrote:
>>         Kenn:
>>         +1 to using TimerFamily instead of TimerId and TimerMap.
>>
>>         Jan:
>>         This is definitely not just for DSLs. I've definitely seen
>>         cases where the user wants different timers based on input
>>         data, so they cannot be defined statically. As a thought
>>         experiment: one stated goal of state + timers was to provide
>>         the low-level tools we use to implement windowing. However to
>>         implement windowing you need a dynamic set of timers, not
>>         just a single one. Now most users don't need to reimplement
>>         windowing (though we have had some users who had that need,
>>         when they wanted something slightly different than what
>>         native Beam windowing provided), however the need for dynamic
>>         timers is not unheard of.
>>
>>         +1 to allowing dynamic state. However I think this is
>>         separate enough from timers that it doesn't need to be
>>         coupled in this discussion. Dynamic state also raises the
>>         wrinkle of pipeline compatibility (as you mentioned), which I
>>         think is a bit less of an issue for dynamic timers.
>>
>>         Allowing a DSL to specify a DoFnSignature does not quite
>>         solve this problem. The DSL still needs a way to set and
>>         process the timers. It also does not solve the problem where
>>         the timers are based on input data elements, so cannot be
>>         known at pipeline construction time. However what might be
>>         more important is statically defining the timer families, and
>>         a DSL could do this by specifying a DoFnSignature (and
>>         something similar could be done with state). Also as
>>         mentioned above, this is useful to normal Beam users as well,
>>         and we shouldn't force normal users to start dealing with
>>         DoFnSignatures and DoFnInvokers.
>>
>>
>>
>>
>>
>>
>>         On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je.ik@seznam.cz
>>         <ma...@seznam.cz>> wrote:
>>
>>             Hi Max,
>>
>>             wouldn't that be actually the same as
>>
>>             class MyDoFn extends DoFn<String, String> {
>>
>>
>>                @ProcessElement
>>                public void process(
>>                    ProcessContext context) {
>>                  // "get" would register a new TimerSpec
>>                  Timer timer1 = context.getTimer("timer1");
>>                  Timer timer2 = context.getTimer("timer2");
>>                  timers.set(...);
>>                  timers.set(...);
>>                }
>>
>>             That is - no need to declare anything? One more concern
>>             about that - if
>>             we allow registration of timers (or even state)
>>             dynamically like that it
>>             might be harder to perform validation of pipeline upon
>>             upgrades.
>>
>>             Jan
>>
>>             On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>             > The idea makes sense to me. I really like that Beam
>>             gives upfront
>>             > specs for timer and state, but it is not flexible
>>             enough for
>>             > timer-based libraries or for users which want to
>>             dynamically generate
>>             > timers.
>>             >
>>             > I'm not sure about the proposed API yet. Shouldn't we
>>             separate the
>>             > timer specs from setting actual timers?
>>             >
>>             > Suggestion:
>>             >
>>             > class MyDoFn extends DoFn<String, String> {
>>             >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>             >
>>             >   @ProcessElement
>>             >   public void process(
>>             >       @Element String e,
>>             >       @TimerMap TimerMap timers)) {
>>             >     // "get" would register a new TimerSpec
>>             >     Timer timer1 = timers.get("timer1");
>>             >     Timer timer2 = timers.get("timer2");
>>             >     timers.set(...);
>>             >     timers.set(...);
>>             >   }
>>             >
>>             >   // No args for "@OnTimer" => use generic TimerMap
>>             >   @OnTimer
>>             >   public void onTimer(
>>             >       @TimerId String timerFired,
>>             >       @Timestamp Instant timerTs,
>>             >       @TimerMap TimerMap timers) {
>>             >      // Timer firing
>>             >      ...
>>             >      // Set this timer (or another)
>>             >      Timer timer = timers.get(timerFired);
>>             >      timer.set(...);
>>             >   }
>>             > }
>>             >
>>             > What do you think?
>>             >
>>             > -Max
>>             >
>>             > On 22.10.19 10:35, Jan Lukavský wrote:
>>             >> Hi Kenn,
>>             >>
>>             >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>             >>> This seems extremely useful.
>>             >>>
>>             >>> I assume you mean `@OnTimer("timers")` in your
>>             example. I would
>>             >>> suggest that the parameter annotation be something other
>>             >>> than @TimerId since that annotation is already used
>>             for a very
>>             >>> similar but different purpose; they are close enough
>>             that it is
>>             >>> tempting to pun them, but it is clearer to keep them
>>             distinct IMO.
>>             >>> Perhaps @TimerName or @TimerKey or some such.
>>             Alternatively,
>>             >>> keep @TimerId in the parameter list and change the
>>             declaration
>>             >>> to @TimerFamily("timers"). I think "family" or
>>             "group" may be more
>>             >>> clear naming than "map".
>>             >>>
>>             >>> At the portability level, this API does seem to be
>>             pretty close to a
>>             >>> noop in terms of the messages that needs to be sent
>>             over the Fn API,
>>             >>> so it makes sense to loosen the protos. By the time
>>             the Fn API is in
>>             >>> play, all of our desires to catch errors prior to
>>             execution are
>>             >>> irrelevant anyhow.
>>             >>>
>>             >>> On the other hand, I think DSLs have a different &
>>             bigger problem
>>             >>> than this, in that they want to programmatically
>>             adjust all the
>>             >>> capabilities of a DoFn. Same goes for wrapping one
>>             DoFn in
>>             >>> another. Certainly some limited DSL use cases are
>>             addressed by this,
>>             >>> but I wouldn't take that as a primary use case for
>>             this feature.
>>             >>> Ultimately they are probably better served by being
>>             able to
>>             >>> explicitly author a DoFnInvoker and provide it to a
>>             variant of
>>             >>> beam:transforms:ParDo where the do_fn field is a
>>             serialized
>>             >>> DoFnInvoker. Now that I think about this, I cannot
>>             recall why we
>>             >>> don't already ship a DoFnSignature & DoFnInvoker as
>>             the payload.
>>             >>> That would allow maximum flexibility in utilizing the
>>             portability
>>             >>> framework.
>>             >>
>>             >> yes, exactly, but when DSLs are in question, we have
>>             to make sure
>>             >> that DSLs are not bound to portability - we have to be
>>             able to
>>             >> translate even in case of "legacy" runners as well.
>>             That might
>>             >> complicate things a bit maybe.
>>             >>
>>             >> Jan
>>             >>
>>             >>>
>>             >>> Kenn
>>             >>>
>>             >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax
>>             <relax@google.com <ma...@google.com>
>>             >>> <mailto:relax@google.com <ma...@google.com>>>
>>             wrote:
>>             >>>
>>             >>>     BEAM-6857 documents the need for dynamic timer
>>             support in the Beam
>>             >>>     API. I wanted to make a proposal for what this
>>             API would look
>>             >>>     like, and how to express it in the portability
>>             protos.
>>             >>>
>>             >>>     Background: Today Beam (especially BeamJava)
>>             requires a ParDo to
>>             >>>     statically declare all timers it accesses at
>>             compile time. For
>>             >>>     example:
>>             >>>
>>             >>>     class MyDoFn extends DoFn<String, String> {
>>             >>>       @TimerId("timer1") TimerSpec timer1 =
>>             >>> TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>             >>>       @TimerId("timer2") TimerSpec timer2 =
>>             >>> TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>             >>>
>>             >>>       @ProcessElement
>>             >>>       public void process(@Element String
>>             e, @TimerId("timer1") Timer
>>             >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>             >>>         timer1.set(...);
>>             >>>         timer2.set(...);
>>             >>>       }
>>             >>>
>>             >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>             >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>             >>>     }
>>             >>>
>>             >>>     This requires the author of a ParDo to know the
>>             full list of
>>             >>>     timers ahead of time, which has been problematic
>>             in many cases.
>>             >>>     One example where it causes issues is for DSLs
>>             such as Euphoria or
>>             >>>     Scio. DSL authors usually write ParDos to
>>             interpret the code
>>             >>>     written in the high-level DSL, and so don't know
>>             ahead of time the
>>             >>>     list of timers needed; alternatives today are
>>             quite ugly: physical
>>             >>>     code generation or creating a single timer that
>>             multiplexes all of
>>             >>>     the users logical timers. There are also cases
>>             where a ParDo needs
>>             >>>     multiple distinct timers, but the set of distinct
>>             timers is
>>             >>>     controlled by the input data, and therefore not
>>             knowable in
>>             >>>     advance. The Beam timer API has been insufficient
>>             for these use
>>             >>> cases.
>>             >>>
>>             >>>     I propose a new TimerMap construct, which allow a
>>             ParDo to
>>             >>>     dynamically set named timers. It's use in the
>>             Java API would look
>>             >>>     as follows:
>>             >>>
>>             >>>     class MyDoFn extends DoFn<String, String> {
>>             >>>       @TimerId("timers") TimerSpec timers =
>>             >>> TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>             >>>
>>             >>>       @ProcessElement
>>             >>>       public void process(@Element String
>>             e, @TimerId("timers")
>>             >>>     TimerMap timer)) {
>>             >>>         timers.set("timer1", ...);
>>             >>>         timers.set("timer2", ...);
>>             >>>       }
>>             >>>
>>             >>>       @OnTimer("timer") public void onTimer(@TimerId
>>             String
>>             >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>             >>>     }
>>             >>>
>>             >>>     There is a new TimerSpec type to specify a
>>             TimerMap. The TimerMap
>>             >>>     class itself allows dynamically setting multiple
>>             timers based on a
>>             >>>     String tag argument. Each TimerMap has a single
>>             callback which
>>             >>>     when called is given the id of the timer that is
>>             currently firing.
>>             >>>
>>             >>>     It is allowed to have multiple TimerMap objects
>>             in a ParDo (and
>>             >>>     required if you want to have both processing-time
>>             and event-time
>>             >>>     timers in the same ParDo). Each TimerMap is its
>>             own logical
>>             >>>     namespace. i.e. if the user sets timers with the
>>             same string tag
>>             >>>     on different TimerMap objects the timers will not
>>             collide.
>>             >>>
>>             >>>     Currently the portability protos were written to
>>             mirror the Java
>>             >>>     API, expecting one TimerSpec per timer accessed
>>             by the ParDo. I
>>             >>>     suggest that we instead make TimerMap the default
>>             for portability,
>>             >>>     and model the current behavior on top of timer
>>             map. If this proves
>>             >>>     problematic for some runners, we could instead
>>             introduce a new
>>             >>>     TimerSpec proto to represent TimerMap.
>>             >>>
>>             >>>     Thoughts?
>>             >>>
>>             >>>     Reuven
>>             >>>
>>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Reuven Lax <re...@google.com>.
Just to circle back around, after the discussion on this thread I propose
modifying the proposed API as follows:

class MyDoFn extends DoFn<String, String> {
  @TimerFamily("timers") TimerSpec timers =
TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerFamily("timers") TimerMap
timers)) {
    timers.set("timer1", ...);
    timers.set("timer2", ...);
  }

  @OnTimer("timer") public void onTimer(@TimerId String timerFired,
@Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ... }
}

Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
are a bit independent (though not completely so, as it does relate), so I
suggest splitting that into a separate discussion.

Reuven

On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Reuven,
>>
>> yes, if this change is intended to be used by end users, then
>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>> with dynamic state - I agree that this is separate problem, but because it
>> is close enough, we should know how we want to deal with that. Because
>> state and timers share some functionality (after all timers need state to
>> be fault tolerant), these API should IMO share the same logic. Whatever
>> solution chosen to expose dynamic timers, it should extend to dynamic state.
>>
>> I'd like to stop a little with the premise that users want dynamic timers
>> (that is timers whose *name* - and therefore behavior - is determined by
>> incoming data). Could this case be modeled so that the timer actually has
>> one more (implicit) state variable that actually holds collection of tuples
>> (timestamp, name)? Then the timer would be invoked at given (minimum of all
>> currently set) timestamps with respective name? The question here probably
>> is - can this have performance impact? That is to say - can any runner
>> actually do anything different from this in the sense of time complexity of
>> the algorithm?
>>
>
> Yes - you could always multiplex many timers one one. This is what some
> users do today, but it tends to be very inefficient and also complex. The
> Beam model requires runners to support dynamic timers per key (e.g. that
> how windowing is implemented - each window has a separate timer), so
> there's no reason not to expose this to users.
>
>> I'm a little afraid if actually letting users define data-driven timers
>> might not be too restrictive for some runners. Yes, runners that don't have
>> this option would probably be able to resort to the logic described above,
>> but if this work could be reasonably done for all runners, then we wouldn't
>> force runners to actually implement it. And, the API could look as if the
>> timers were actually dynamic.
>>
>> Jan
>>
>> P.S. If dynamic (and therefore any number) of timers can be actually
>> implemented using single timer, that might be interesting pattern, because
>> single timer per (window, key) has many nice properties, like it implicitly
>> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
>> seems to issue for multiple runners (samza, portable flink).
>>
> BEAM-7520 is simply an implementation bug. I don't think it makes sense to
> fix a bug by restricting the model.
>
>
>> On 10/22/19 6:52 PM, Reuven Lax wrote:
>>
>> Kenn:
>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>
>> Jan:
>> This is definitely not just for DSLs. I've definitely seen cases where
>> the user wants different timers based on input data, so they cannot be
>> defined statically. As a thought experiment: one stated goal of state +
>> timers was to provide the low-level tools we use to implement windowing.
>> However to implement windowing you need a dynamic set of timers, not just a
>> single one. Now most users don't need to reimplement windowing (though we
>> have had some users who had that need, when they wanted something slightly
>> different than what native Beam windowing provided), however the need for
>> dynamic timers is not unheard of.
>>
>> +1 to allowing dynamic state. However I think this is separate enough
>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>> which I think is a bit less of an issue for dynamic timers.
>>
>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>> problem. The DSL still needs a way to set and process the timers. It also
>> does not solve the problem where the timers are based on input data
>> elements, so cannot be known at pipeline construction time. However what
>> might be more important is statically defining the timer families, and a
>> DSL could do this by specifying a DoFnSignature (and something similar
>> could be done with state). Also as mentioned above, this is useful to
>> normal Beam users as well, and we shouldn't force normal users to start
>> dealing with DoFnSignatures and DoFnInvokers.
>>
>>
>>
>>
>>
>>
>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Max,
>>>
>>> wouldn't that be actually the same as
>>>
>>> class MyDoFn extends DoFn<String, String> {
>>>
>>>
>>>    @ProcessElement
>>>    public void process(
>>>        ProcessContext context) {
>>>      // "get" would register a new TimerSpec
>>>      Timer timer1 = context.getTimer("timer1");
>>>      Timer timer2 = context.getTimer("timer2");
>>>      timers.set(...);
>>>      timers.set(...);
>>>    }
>>>
>>> That is - no need to declare anything? One more concern about that - if
>>> we allow registration of timers (or even state) dynamically like that it
>>> might be harder to perform validation of pipeline upon upgrades.
>>>
>>> Jan
>>>
>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>> > The idea makes sense to me. I really like that Beam gives upfront
>>> > specs for timer and state, but it is not flexible enough for
>>> > timer-based libraries or for users which want to dynamically generate
>>> > timers.
>>> >
>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>> > timer specs from setting actual timers?
>>> >
>>> > Suggestion:
>>> >
>>> > class MyDoFn extends DoFn<String, String> {
>>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>> >
>>> >   @ProcessElement
>>> >   public void process(
>>> >       @Element String e,
>>> >       @TimerMap TimerMap timers)) {
>>> >     // "get" would register a new TimerSpec
>>> >     Timer timer1 = timers.get("timer1");
>>> >     Timer timer2 = timers.get("timer2");
>>> >     timers.set(...);
>>> >     timers.set(...);
>>> >   }
>>> >
>>> >   // No args for "@OnTimer" => use generic TimerMap
>>> >   @OnTimer
>>> >   public void onTimer(
>>> >       @TimerId String timerFired,
>>> >       @Timestamp Instant timerTs,
>>> >       @TimerMap TimerMap timers) {
>>> >      // Timer firing
>>> >      ...
>>> >      // Set this timer (or another)
>>> >      Timer timer = timers.get(timerFired);
>>> >      timer.set(...);
>>> >   }
>>> > }
>>> >
>>> > What do you think?
>>> >
>>> > -Max
>>> >
>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>> >> Hi Kenn,
>>> >>
>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>> >>> This seems extremely useful.
>>> >>>
>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>> >>> suggest that the parameter annotation be something other
>>> >>> than @TimerId since that annotation is already used for a very
>>> >>> similar but different purpose; they are close enough that it is
>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>> >>> keep @TimerId in the parameter list and change the declaration
>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>> >>> clear naming than "map".
>>> >>>
>>> >>> At the portability level, this API does seem to be pretty close to a
>>> >>> noop in terms of the messages that needs to be sent over the Fn API,
>>> >>> so it makes sense to loosen the protos. By the time the Fn API is in
>>> >>> play, all of our desires to catch errors prior to execution are
>>> >>> irrelevant anyhow.
>>> >>>
>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>> >>> than this, in that they want to programmatically adjust all the
>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>> >>> another. Certainly some limited DSL use cases are addressed by this,
>>> >>> but I wouldn't take that as a primary use case for this feature.
>>> >>> Ultimately they are probably better served by being able to
>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>> >>> That would allow maximum flexibility in utilizing the portability
>>> >>> framework.
>>> >>
>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>> >> that DSLs are not bound to portability - we have to be able to
>>> >> translate even in case of "legacy" runners as well. That might
>>> >> complicate things a bit maybe.
>>> >>
>>> >> Jan
>>> >>
>>> >>>
>>> >>> Kenn
>>> >>>
>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
>>> >>> <ma...@google.com>> wrote:
>>> >>>
>>> >>>     BEAM-6857 documents the need for dynamic timer support in the
>>> Beam
>>> >>>     API. I wanted to make a proposal for what this API would look
>>> >>>     like, and how to express it in the portability protos.
>>> >>>
>>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>>> >>>     statically declare all timers it accesses at compile time. For
>>> >>>     example:
>>> >>>
>>> >>>     class MyDoFn extends DoFn<String, String> {
>>> >>>       @TimerId("timer1") TimerSpec timer1 =
>>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>> >>>       @TimerId("timer2") TimerSpec timer2 =
>>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>> >>>
>>> >>>       @ProcessElement
>>> >>>       public void process(@Element String e, @TimerId("timer1") Timer
>>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>> >>>         timer1.set(...);
>>> >>>         timer2.set(...);
>>> >>>       }
>>> >>>
>>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>> >>>     }
>>> >>>
>>> >>>     This requires the author of a ParDo to know the full list of
>>> >>>     timers ahead of time, which has been problematic in many cases.
>>> >>>     One example where it causes issues is for DSLs such as Euphoria
>>> or
>>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>>> >>>     written in the high-level DSL, and so don't know ahead of time
>>> the
>>> >>>     list of timers needed; alternatives today are quite ugly:
>>> physical
>>> >>>     code generation or creating a single timer that multiplexes all
>>> of
>>> >>>     the users logical timers. There are also cases where a ParDo
>>> needs
>>> >>>     multiple distinct timers, but the set of distinct timers is
>>> >>>     controlled by the input data, and therefore not knowable in
>>> >>>     advance. The Beam timer API has been insufficient for these use
>>> >>> cases.
>>> >>>
>>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>>> >>>     dynamically set named timers. It's use in the Java API would look
>>> >>>     as follows:
>>> >>>
>>> >>>     class MyDoFn extends DoFn<String, String> {
>>> >>>       @TimerId("timers") TimerSpec timers =
>>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>> >>>
>>> >>>       @ProcessElement
>>> >>>       public void process(@Element String e, @TimerId("timers")
>>> >>>     TimerMap timer)) {
>>> >>>         timers.set("timer1", ...);
>>> >>>         timers.set("timer2", ...);
>>> >>>       }
>>> >>>
>>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>> >>>     }
>>> >>>
>>> >>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>>> >>>     class itself allows dynamically setting multiple timers based on
>>> a
>>> >>>     String tag argument. Each TimerMap has a single callback which
>>> >>>     when called is given the id of the timer that is currently
>>> firing.
>>> >>>
>>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>> >>>     required if you want to have both processing-time and event-time
>>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>>> >>>     namespace. i.e. if the user sets timers with the same string tag
>>> >>>     on different TimerMap objects the timers will not collide.
>>> >>>
>>> >>>     Currently the portability protos were written to mirror the Java
>>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>> >>>     suggest that we instead make TimerMap the default for
>>> portability,
>>> >>>     and model the current behavior on top of timer map. If this
>>> proves
>>> >>>     problematic for some runners, we could instead introduce a new
>>> >>>     TimerSpec proto to represent TimerMap.
>>> >>>
>>> >>>     Thoughts?
>>> >>>
>>> >>>     Reuven
>>> >>>
>>>
>>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Reuven Lax <re...@google.com>.
On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Reuven,
>
> yes, if this change is intended to be used by end users, then
> DoFnSignatures cannot be used, agree on that. Regarding the relationship
> with dynamic state - I agree that this is separate problem, but because it
> is close enough, we should know how we want to deal with that. Because
> state and timers share some functionality (after all timers need state to
> be fault tolerant), these API should IMO share the same logic. Whatever
> solution chosen to expose dynamic timers, it should extend to dynamic state.
>
> I'd like to stop a little with the premise that users want dynamic timers
> (that is timers whose *name* - and therefore behavior - is determined by
> incoming data). Could this case be modeled so that the timer actually has
> one more (implicit) state variable that actually holds collection of tuples
> (timestamp, name)? Then the timer would be invoked at given (minimum of all
> currently set) timestamps with respective name? The question here probably
> is - can this have performance impact? That is to say - can any runner
> actually do anything different from this in the sense of time complexity of
> the algorithm?
>

Yes - you could always multiplex many timers one one. This is what some
users do today, but it tends to be very inefficient and also complex. The
Beam model requires runners to support dynamic timers per key (e.g. that
how windowing is implemented - each window has a separate timer), so
there's no reason not to expose this to users.

> I'm a little afraid if actually letting users define data-driven timers
> might not be too restrictive for some runners. Yes, runners that don't have
> this option would probably be able to resort to the logic described above,
> but if this work could be reasonably done for all runners, then we wouldn't
> force runners to actually implement it. And, the API could look as if the
> timers were actually dynamic.
>
> Jan
>
> P.S. If dynamic (and therefore any number) of timers can be actually
> implemented using single timer, that might be interesting pattern, because
> single timer per (window, key) has many nice properties, like it implicitly
> avoids situation where timer invocation is not ordered ([BEAM-7520]), which
> seems to issue for multiple runners (samza, portable flink).
>
BEAM-7520 is simply an implementation bug. I don't think it makes sense to
fix a bug by restricting the model.


> On 10/22/19 6:52 PM, Reuven Lax wrote:
>
> Kenn:
> +1 to using TimerFamily instead of TimerId and TimerMap.
>
> Jan:
> This is definitely not just for DSLs. I've definitely seen cases where the
> user wants different timers based on input data, so they cannot be defined
> statically. As a thought experiment: one stated goal of state + timers was
> to provide the low-level tools we use to implement windowing. However to
> implement windowing you need a dynamic set of timers, not just a single
> one. Now most users don't need to reimplement windowing (though we have had
> some users who had that need, when they wanted something slightly different
> than what native Beam windowing provided), however the need for dynamic
> timers is not unheard of.
>
> +1 to allowing dynamic state. However I think this is separate enough from
> timers that it doesn't need to be coupled in this discussion. Dynamic state
> also raises the wrinkle of pipeline compatibility (as you mentioned),
> which I think is a bit less of an issue for dynamic timers.
>
> Allowing a DSL to specify a DoFnSignature does not quite solve this
> problem. The DSL still needs a way to set and process the timers. It also
> does not solve the problem where the timers are based on input data
> elements, so cannot be known at pipeline construction time. However what
> might be more important is statically defining the timer families, and a
> DSL could do this by specifying a DoFnSignature (and something similar
> could be done with state). Also as mentioned above, this is useful to
> normal Beam users as well, and we shouldn't force normal users to start
> dealing with DoFnSignatures and DoFnInvokers.
>
>
>
>
>
>
> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Max,
>>
>> wouldn't that be actually the same as
>>
>> class MyDoFn extends DoFn<String, String> {
>>
>>
>>    @ProcessElement
>>    public void process(
>>        ProcessContext context) {
>>      // "get" would register a new TimerSpec
>>      Timer timer1 = context.getTimer("timer1");
>>      Timer timer2 = context.getTimer("timer2");
>>      timers.set(...);
>>      timers.set(...);
>>    }
>>
>> That is - no need to declare anything? One more concern about that - if
>> we allow registration of timers (or even state) dynamically like that it
>> might be harder to perform validation of pipeline upon upgrades.
>>
>> Jan
>>
>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>> > The idea makes sense to me. I really like that Beam gives upfront
>> > specs for timer and state, but it is not flexible enough for
>> > timer-based libraries or for users which want to dynamically generate
>> > timers.
>> >
>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>> > timer specs from setting actual timers?
>> >
>> > Suggestion:
>> >
>> > class MyDoFn extends DoFn<String, String> {
>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>> >
>> >   @ProcessElement
>> >   public void process(
>> >       @Element String e,
>> >       @TimerMap TimerMap timers)) {
>> >     // "get" would register a new TimerSpec
>> >     Timer timer1 = timers.get("timer1");
>> >     Timer timer2 = timers.get("timer2");
>> >     timers.set(...);
>> >     timers.set(...);
>> >   }
>> >
>> >   // No args for "@OnTimer" => use generic TimerMap
>> >   @OnTimer
>> >   public void onTimer(
>> >       @TimerId String timerFired,
>> >       @Timestamp Instant timerTs,
>> >       @TimerMap TimerMap timers) {
>> >      // Timer firing
>> >      ...
>> >      // Set this timer (or another)
>> >      Timer timer = timers.get(timerFired);
>> >      timer.set(...);
>> >   }
>> > }
>> >
>> > What do you think?
>> >
>> > -Max
>> >
>> > On 22.10.19 10:35, Jan Lukavský wrote:
>> >> Hi Kenn,
>> >>
>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>> >>> This seems extremely useful.
>> >>>
>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>> >>> suggest that the parameter annotation be something other
>> >>> than @TimerId since that annotation is already used for a very
>> >>> similar but different purpose; they are close enough that it is
>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>> >>> keep @TimerId in the parameter list and change the declaration
>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>> >>> clear naming than "map".
>> >>>
>> >>> At the portability level, this API does seem to be pretty close to a
>> >>> noop in terms of the messages that needs to be sent over the Fn API,
>> >>> so it makes sense to loosen the protos. By the time the Fn API is in
>> >>> play, all of our desires to catch errors prior to execution are
>> >>> irrelevant anyhow.
>> >>>
>> >>> On the other hand, I think DSLs have a different & bigger problem
>> >>> than this, in that they want to programmatically adjust all the
>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>> >>> another. Certainly some limited DSL use cases are addressed by this,
>> >>> but I wouldn't take that as a primary use case for this feature.
>> >>> Ultimately they are probably better served by being able to
>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>> >>> That would allow maximum flexibility in utilizing the portability
>> >>> framework.
>> >>
>> >> yes, exactly, but when DSLs are in question, we have to make sure
>> >> that DSLs are not bound to portability - we have to be able to
>> >> translate even in case of "legacy" runners as well. That might
>> >> complicate things a bit maybe.
>> >>
>> >> Jan
>> >>
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
>> >>> <ma...@google.com>> wrote:
>> >>>
>> >>>     BEAM-6857 documents the need for dynamic timer support in the Beam
>> >>>     API. I wanted to make a proposal for what this API would look
>> >>>     like, and how to express it in the portability protos.
>> >>>
>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>> >>>     statically declare all timers it accesses at compile time. For
>> >>>     example:
>> >>>
>> >>>     class MyDoFn extends DoFn<String, String> {
>> >>>       @TimerId("timer1") TimerSpec timer1 =
>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>> >>>       @TimerId("timer2") TimerSpec timer2 =
>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>> >>>
>> >>>       @ProcessElement
>> >>>       public void process(@Element String e, @TimerId("timer1") Timer
>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>> >>>         timer1.set(...);
>> >>>         timer2.set(...);
>> >>>       }
>> >>>
>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>> >>>     }
>> >>>
>> >>>     This requires the author of a ParDo to know the full list of
>> >>>     timers ahead of time, which has been problematic in many cases.
>> >>>     One example where it causes issues is for DSLs such as Euphoria or
>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>> >>>     written in the high-level DSL, and so don't know ahead of time the
>> >>>     list of timers needed; alternatives today are quite ugly: physical
>> >>>     code generation or creating a single timer that multiplexes all of
>> >>>     the users logical timers. There are also cases where a ParDo needs
>> >>>     multiple distinct timers, but the set of distinct timers is
>> >>>     controlled by the input data, and therefore not knowable in
>> >>>     advance. The Beam timer API has been insufficient for these use
>> >>> cases.
>> >>>
>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>> >>>     dynamically set named timers. It's use in the Java API would look
>> >>>     as follows:
>> >>>
>> >>>     class MyDoFn extends DoFn<String, String> {
>> >>>       @TimerId("timers") TimerSpec timers =
>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>> >>>
>> >>>       @ProcessElement
>> >>>       public void process(@Element String e, @TimerId("timers")
>> >>>     TimerMap timer)) {
>> >>>         timers.set("timer1", ...);
>> >>>         timers.set("timer2", ...);
>> >>>       }
>> >>>
>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>> >>>     }
>> >>>
>> >>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>> >>>     class itself allows dynamically setting multiple timers based on a
>> >>>     String tag argument. Each TimerMap has a single callback which
>> >>>     when called is given the id of the timer that is currently firing.
>> >>>
>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>> >>>     required if you want to have both processing-time and event-time
>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>> >>>     namespace. i.e. if the user sets timers with the same string tag
>> >>>     on different TimerMap objects the timers will not collide.
>> >>>
>> >>>     Currently the portability protos were written to mirror the Java
>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>> >>>     suggest that we instead make TimerMap the default for portability,
>> >>>     and model the current behavior on top of timer map. If this proves
>> >>>     problematic for some runners, we could instead introduce a new
>> >>>     TimerSpec proto to represent TimerMap.
>> >>>
>> >>>     Thoughts?
>> >>>
>> >>>     Reuven
>> >>>
>>
>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Reuven,

yes, if this change is intended to be used by end users, then 
DoFnSignatures cannot be used, agree on that. Regarding the relationship 
with dynamic state - I agree that this is separate problem, but because 
it is close enough, we should know how we want to deal with that. 
Because state and timers share some functionality (after all timers need 
state to be fault tolerant), these API should IMO share the same logic. 
Whatever solution chosen to expose dynamic timers, it should extend to 
dynamic state.

I'd like to stop a little with the premise that users want dynamic 
timers (that is timers whose *name* - and therefore behavior - is 
determined by incoming data). Could this case be modeled so that the 
timer actually has one more (implicit) state variable that actually 
holds collection of tuples (timestamp, name)? Then the timer would be 
invoked at given (minimum of all currently set) timestamps with 
respective name? The question here probably is - can this have 
performance impact? That is to say - can any runner actually do anything 
different from this in the sense of time complexity of the algorithm?

I'm a little afraid if actually letting users define data-driven timers 
might not be too restrictive for some runners. Yes, runners that don't 
have this option would probably be able to resort to the logic described 
above, but if this work could be reasonably done for all runners, then 
we wouldn't force runners to actually implement it. And, the API could 
look as if the timers were actually dynamic.

Jan

P.S. If dynamic (and therefore any number) of timers can be actually 
implemented using single timer, that might be interesting pattern, 
because single timer per (window, key) has many nice properties, like it 
implicitly avoids situation where timer invocation is not ordered 
([BEAM-7520]), which seems to issue for multiple runners (samza, 
portable flink).

On 10/22/19 6:52 PM, Reuven Lax wrote:
> Kenn:
> +1 to using TimerFamily instead of TimerId and TimerMap.
>
> Jan:
> This is definitely not just for DSLs. I've definitely seen cases where 
> the user wants different timers based on input data, so they cannot be 
> defined statically. As a thought experiment: one stated goal of 
> state + timers was to provide the low-level tools we use to implement 
> windowing. However to implement windowing you need a dynamic set of 
> timers, not just a single one. Now most users don't need to 
> reimplement windowing (though we have had some users who had that 
> need, when they wanted something slightly different than what native 
> Beam windowing provided), however the need for dynamic timers is not 
> unheard of.
>
> +1 to allowing dynamic state. However I think this is separate enough 
> from timers that it doesn't need to be coupled in this discussion. 
> Dynamic state also raises the wrinkle of pipeline compatibility (as 
> you mentioned), which I think is a bit less of an issue for dynamic 
> timers.
>
> Allowing a DSL to specify a DoFnSignature does not quite solve this 
> problem. The DSL still needs a way to set and process the timers. It 
> also does not solve the problem where the timers are based on input 
> data elements, so cannot be known at pipeline construction time. 
> However what might be more important is statically defining the timer 
> families, and a DSL could do this by specifying a DoFnSignature (and 
> something similar could be done with state). Also as mentioned above, 
> this is useful to normal Beam users as well, and we shouldn't force 
> normal users to start dealing with DoFnSignatures and DoFnInvokers.
>
>
>
>
>
>
> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi Max,
>
>     wouldn't that be actually the same as
>
>     class MyDoFn extends DoFn<String, String> {
>
>
>        @ProcessElement
>        public void process(
>            ProcessContext context) {
>          // "get" would register a new TimerSpec
>          Timer timer1 = context.getTimer("timer1");
>          Timer timer2 = context.getTimer("timer2");
>          timers.set(...);
>          timers.set(...);
>        }
>
>     That is - no need to declare anything? One more concern about that
>     - if
>     we allow registration of timers (or even state) dynamically like
>     that it
>     might be harder to perform validation of pipeline upon upgrades.
>
>     Jan
>
>     On 10/22/19 4:47 PM, Maximilian Michels wrote:
>     > The idea makes sense to me. I really like that Beam gives upfront
>     > specs for timer and state, but it is not flexible enough for
>     > timer-based libraries or for users which want to dynamically
>     generate
>     > timers.
>     >
>     > I'm not sure about the proposed API yet. Shouldn't we separate the
>     > timer specs from setting actual timers?
>     >
>     > Suggestion:
>     >
>     > class MyDoFn extends DoFn<String, String> {
>     >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>     >
>     >   @ProcessElement
>     >   public void process(
>     >       @Element String e,
>     >       @TimerMap TimerMap timers)) {
>     >     // "get" would register a new TimerSpec
>     >     Timer timer1 = timers.get("timer1");
>     >     Timer timer2 = timers.get("timer2");
>     >     timers.set(...);
>     >     timers.set(...);
>     >   }
>     >
>     >   // No args for "@OnTimer" => use generic TimerMap
>     >   @OnTimer
>     >   public void onTimer(
>     >       @TimerId String timerFired,
>     >       @Timestamp Instant timerTs,
>     >       @TimerMap TimerMap timers) {
>     >      // Timer firing
>     >      ...
>     >      // Set this timer (or another)
>     >      Timer timer = timers.get(timerFired);
>     >      timer.set(...);
>     >   }
>     > }
>     >
>     > What do you think?
>     >
>     > -Max
>     >
>     > On 22.10.19 10:35, Jan Lukavský wrote:
>     >> Hi Kenn,
>     >>
>     >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>     >>> This seems extremely useful.
>     >>>
>     >>> I assume you mean `@OnTimer("timers")` in your example. I would
>     >>> suggest that the parameter annotation be something other
>     >>> than @TimerId since that annotation is already used for a very
>     >>> similar but different purpose; they are close enough that it is
>     >>> tempting to pun them, but it is clearer to keep them distinct
>     IMO.
>     >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>     >>> keep @TimerId in the parameter list and change the declaration
>     >>> to @TimerFamily("timers"). I think "family" or "group" may be
>     more
>     >>> clear naming than "map".
>     >>>
>     >>> At the portability level, this API does seem to be pretty
>     close to a
>     >>> noop in terms of the messages that needs to be sent over the
>     Fn API,
>     >>> so it makes sense to loosen the protos. By the time the Fn API
>     is in
>     >>> play, all of our desires to catch errors prior to execution are
>     >>> irrelevant anyhow.
>     >>>
>     >>> On the other hand, I think DSLs have a different & bigger problem
>     >>> than this, in that they want to programmatically adjust all the
>     >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>     >>> another. Certainly some limited DSL use cases are addressed by
>     this,
>     >>> but I wouldn't take that as a primary use case for this feature.
>     >>> Ultimately they are probably better served by being able to
>     >>> explicitly author a DoFnInvoker and provide it to a variant of
>     >>> beam:transforms:ParDo where the do_fn field is a serialized
>     >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>     >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>     >>> That would allow maximum flexibility in utilizing the portability
>     >>> framework.
>     >>
>     >> yes, exactly, but when DSLs are in question, we have to make sure
>     >> that DSLs are not bound to portability - we have to be able to
>     >> translate even in case of "legacy" runners as well. That might
>     >> complicate things a bit maybe.
>     >>
>     >> Jan
>     >>
>     >>>
>     >>> Kenn
>     >>>
>     >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
>     <ma...@google.com>
>     >>> <mailto:relax@google.com <ma...@google.com>>> wrote:
>     >>>
>     >>>     BEAM-6857 documents the need for dynamic timer support in
>     the Beam
>     >>>     API. I wanted to make a proposal for what this API would look
>     >>>     like, and how to express it in the portability protos.
>     >>>
>     >>>     Background: Today Beam (especially BeamJava) requires a
>     ParDo to
>     >>>     statically declare all timers it accesses at compile time. For
>     >>>     example:
>     >>>
>     >>>     class MyDoFn extends DoFn<String, String> {
>     >>>       @TimerId("timer1") TimerSpec timer1 =
>     >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>     >>>       @TimerId("timer2") TimerSpec timer2 =
>     >>> TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>     >>>
>     >>>       @ProcessElement
>     >>>       public void process(@Element String
>     e, @TimerId("timer1") Timer
>     >>>     timer1, @TimerId("timer2") Timer timer2)) {
>     >>>         timer1.set(...);
>     >>>         timer2.set(...);
>     >>>       }
>     >>>
>     >>>       @OnTimer("timer1") public void onTimer1() { ... }
>     >>>       @OnTimer("timer2") public void onTimer2() { ... }
>     >>>     }
>     >>>
>     >>>     This requires the author of a ParDo to know the full list of
>     >>>     timers ahead of time, which has been problematic in many
>     cases.
>     >>>     One example where it causes issues is for DSLs such as
>     Euphoria or
>     >>>     Scio. DSL authors usually write ParDos to interpret the code
>     >>>     written in the high-level DSL, and so don't know ahead of
>     time the
>     >>>     list of timers needed; alternatives today are quite ugly:
>     physical
>     >>>     code generation or creating a single timer that
>     multiplexes all of
>     >>>     the users logical timers. There are also cases where a
>     ParDo needs
>     >>>     multiple distinct timers, but the set of distinct timers is
>     >>>     controlled by the input data, and therefore not knowable in
>     >>>     advance. The Beam timer API has been insufficient for
>     these use
>     >>> cases.
>     >>>
>     >>>     I propose a new TimerMap construct, which allow a ParDo to
>     >>>     dynamically set named timers. It's use in the Java API
>     would look
>     >>>     as follows:
>     >>>
>     >>>     class MyDoFn extends DoFn<String, String> {
>     >>>       @TimerId("timers") TimerSpec timers =
>     >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>     >>>
>     >>>       @ProcessElement
>     >>>       public void process(@Element String e, @TimerId("timers")
>     >>>     TimerMap timer)) {
>     >>>         timers.set("timer1", ...);
>     >>>         timers.set("timer2", ...);
>     >>>       }
>     >>>
>     >>>       @OnTimer("timer") public void onTimer(@TimerId String
>     >>>     timerFired, @Timestamp Instant timerTs) { ... }
>     >>>     }
>     >>>
>     >>>     There is a new TimerSpec type to specify a TimerMap. The
>     TimerMap
>     >>>     class itself allows dynamically setting multiple timers
>     based on a
>     >>>     String tag argument. Each TimerMap has a single callback which
>     >>>     when called is given the id of the timer that is currently
>     firing.
>     >>>
>     >>>     It is allowed to have multiple TimerMap objects in a ParDo
>     (and
>     >>>     required if you want to have both processing-time and
>     event-time
>     >>>     timers in the same ParDo). Each TimerMap is its own logical
>     >>>     namespace. i.e. if the user sets timers with the same
>     string tag
>     >>>     on different TimerMap objects the timers will not collide.
>     >>>
>     >>>     Currently the portability protos were written to mirror
>     the Java
>     >>>     API, expecting one TimerSpec per timer accessed by the
>     ParDo. I
>     >>>     suggest that we instead make TimerMap the default for
>     portability,
>     >>>     and model the current behavior on top of timer map. If
>     this proves
>     >>>     problematic for some runners, we could instead introduce a new
>     >>>     TimerSpec proto to represent TimerMap.
>     >>>
>     >>>     Thoughts?
>     >>>
>     >>>     Reuven
>     >>>
>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Reuven Lax <re...@google.com>.
This is a bit of a digression, but at least for the Dataflow runner the
only way to implement that would be to add an extra ValueState per timer
(which would add overhead). Timers were implemented based around blind
writes, and weren't designed for point reads. However in some cases the
ability to have dynamic timers actually solves use cases which otherwise
would need this functionality.

On Tue, Oct 22, 2019 at 4:56 PM Reza Rokni <re...@google.com> wrote:

> +1 on this, having the ability to create timers based on data would make a
> bunch of use cases easier to write.
>
> Any thoughts on having a isSet() / read() / setMinimum(timeStamp) type
> ability?
>
> On Wed, 23 Oct 2019 at 00:52, Reuven Lax <re...@google.com> wrote:
>
>> Kenn:
>> +1 to using TimerFamily instead of TimerId and TimerMap.
>>
>> Jan:
>> This is definitely not just for DSLs. I've definitely seen cases where
>> the user wants different timers based on input data, so they cannot be
>> defined statically. As a thought experiment: one stated goal of state +
>> timers was to provide the low-level tools we use to implement windowing.
>> However to implement windowing you need a dynamic set of timers, not just a
>> single one. Now most users don't need to reimplement windowing (though we
>> have had some users who had that need, when they wanted something slightly
>> different than what native Beam windowing provided), however the need for
>> dynamic timers is not unheard of.
>>
>> +1 to allowing dynamic state. However I think this is separate enough
>> from timers that it doesn't need to be coupled in this discussion. Dynamic
>> state also raises the wrinkle of pipeline compatibility (as you mentioned),
>> which I think is a bit less of an issue for dynamic timers.
>>
>> Allowing a DSL to specify a DoFnSignature does not quite solve this
>> problem. The DSL still needs a way to set and process the timers. It also
>> does not solve the problem where the timers are based on input data
>> elements, so cannot be known at pipeline construction time. However what
>> might be more important is statically defining the timer families, and a
>> DSL could do this by specifying a DoFnSignature (and something similar
>> could be done with state). Also as mentioned above, this is useful to
>> normal Beam users as well, and we shouldn't force normal users to start
>> dealing with DoFnSignatures and DoFnInvokers.
>>
>>
>>
>>
>>
>>
>> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi Max,
>>>
>>> wouldn't that be actually the same as
>>>
>>> class MyDoFn extends DoFn<String, String> {
>>>
>>>
>>>    @ProcessElement
>>>    public void process(
>>>        ProcessContext context) {
>>>      // "get" would register a new TimerSpec
>>>      Timer timer1 = context.getTimer("timer1");
>>>      Timer timer2 = context.getTimer("timer2");
>>>      timers.set(...);
>>>      timers.set(...);
>>>    }
>>>
>>> That is - no need to declare anything? One more concern about that - if
>>> we allow registration of timers (or even state) dynamically like that it
>>> might be harder to perform validation of pipeline upon upgrades.
>>>
>>> Jan
>>>
>>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>>> > The idea makes sense to me. I really like that Beam gives upfront
>>> > specs for timer and state, but it is not flexible enough for
>>> > timer-based libraries or for users which want to dynamically generate
>>> > timers.
>>> >
>>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>>> > timer specs from setting actual timers?
>>> >
>>> > Suggestion:
>>> >
>>> > class MyDoFn extends DoFn<String, String> {
>>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>>> >
>>> >   @ProcessElement
>>> >   public void process(
>>> >       @Element String e,
>>> >       @TimerMap TimerMap timers)) {
>>> >     // "get" would register a new TimerSpec
>>> >     Timer timer1 = timers.get("timer1");
>>> >     Timer timer2 = timers.get("timer2");
>>> >     timers.set(...);
>>> >     timers.set(...);
>>> >   }
>>> >
>>> >   // No args for "@OnTimer" => use generic TimerMap
>>> >   @OnTimer
>>> >   public void onTimer(
>>> >       @TimerId String timerFired,
>>> >       @Timestamp Instant timerTs,
>>> >       @TimerMap TimerMap timers) {
>>> >      // Timer firing
>>> >      ...
>>> >      // Set this timer (or another)
>>> >      Timer timer = timers.get(timerFired);
>>> >      timer.set(...);
>>> >   }
>>> > }
>>> >
>>> > What do you think?
>>> >
>>> > -Max
>>> >
>>> > On 22.10.19 10:35, Jan Lukavský wrote:
>>> >> Hi Kenn,
>>> >>
>>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>> >>> This seems extremely useful.
>>> >>>
>>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>>> >>> suggest that the parameter annotation be something other
>>> >>> than @TimerId since that annotation is already used for a very
>>> >>> similar but different purpose; they are close enough that it is
>>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>>> >>> keep @TimerId in the parameter list and change the declaration
>>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>>> >>> clear naming than "map".
>>> >>>
>>> >>> At the portability level, this API does seem to be pretty close to a
>>> >>> noop in terms of the messages that needs to be sent over the Fn API,
>>> >>> so it makes sense to loosen the protos. By the time the Fn API is in
>>> >>> play, all of our desires to catch errors prior to execution are
>>> >>> irrelevant anyhow.
>>> >>>
>>> >>> On the other hand, I think DSLs have a different & bigger problem
>>> >>> than this, in that they want to programmatically adjust all the
>>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>>> >>> another. Certainly some limited DSL use cases are addressed by this,
>>> >>> but I wouldn't take that as a primary use case for this feature.
>>> >>> Ultimately they are probably better served by being able to
>>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>>> >>> That would allow maximum flexibility in utilizing the portability
>>> >>> framework.
>>> >>
>>> >> yes, exactly, but when DSLs are in question, we have to make sure
>>> >> that DSLs are not bound to portability - we have to be able to
>>> >> translate even in case of "legacy" runners as well. That might
>>> >> complicate things a bit maybe.
>>> >>
>>> >> Jan
>>> >>
>>> >>>
>>> >>> Kenn
>>> >>>
>>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
>>> >>> <ma...@google.com>> wrote:
>>> >>>
>>> >>>     BEAM-6857 documents the need for dynamic timer support in the
>>> Beam
>>> >>>     API. I wanted to make a proposal for what this API would look
>>> >>>     like, and how to express it in the portability protos.
>>> >>>
>>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>>> >>>     statically declare all timers it accesses at compile time. For
>>> >>>     example:
>>> >>>
>>> >>>     class MyDoFn extends DoFn<String, String> {
>>> >>>       @TimerId("timer1") TimerSpec timer1 =
>>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>> >>>       @TimerId("timer2") TimerSpec timer2 =
>>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>> >>>
>>> >>>       @ProcessElement
>>> >>>       public void process(@Element String e, @TimerId("timer1") Timer
>>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>>> >>>         timer1.set(...);
>>> >>>         timer2.set(...);
>>> >>>       }
>>> >>>
>>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>>> >>>     }
>>> >>>
>>> >>>     This requires the author of a ParDo to know the full list of
>>> >>>     timers ahead of time, which has been problematic in many cases.
>>> >>>     One example where it causes issues is for DSLs such as Euphoria
>>> or
>>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>>> >>>     written in the high-level DSL, and so don't know ahead of time
>>> the
>>> >>>     list of timers needed; alternatives today are quite ugly:
>>> physical
>>> >>>     code generation or creating a single timer that multiplexes all
>>> of
>>> >>>     the users logical timers. There are also cases where a ParDo
>>> needs
>>> >>>     multiple distinct timers, but the set of distinct timers is
>>> >>>     controlled by the input data, and therefore not knowable in
>>> >>>     advance. The Beam timer API has been insufficient for these use
>>> >>> cases.
>>> >>>
>>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>>> >>>     dynamically set named timers. It's use in the Java API would look
>>> >>>     as follows:
>>> >>>
>>> >>>     class MyDoFn extends DoFn<String, String> {
>>> >>>       @TimerId("timers") TimerSpec timers =
>>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>> >>>
>>> >>>       @ProcessElement
>>> >>>       public void process(@Element String e, @TimerId("timers")
>>> >>>     TimerMap timer)) {
>>> >>>         timers.set("timer1", ...);
>>> >>>         timers.set("timer2", ...);
>>> >>>       }
>>> >>>
>>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>>> >>>     }
>>> >>>
>>> >>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>>> >>>     class itself allows dynamically setting multiple timers based on
>>> a
>>> >>>     String tag argument. Each TimerMap has a single callback which
>>> >>>     when called is given the id of the timer that is currently
>>> firing.
>>> >>>
>>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>> >>>     required if you want to have both processing-time and event-time
>>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>>> >>>     namespace. i.e. if the user sets timers with the same string tag
>>> >>>     on different TimerMap objects the timers will not collide.
>>> >>>
>>> >>>     Currently the portability protos were written to mirror the Java
>>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>> >>>     suggest that we instead make TimerMap the default for
>>> portability,
>>> >>>     and model the current behavior on top of timer map. If this
>>> proves
>>> >>>     problematic for some runners, we could instead introduce a new
>>> >>>     TimerSpec proto to represent TimerMap.
>>> >>>
>>> >>>     Thoughts?
>>> >>>
>>> >>>     Reuven
>>> >>>
>>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Reza Rokni <re...@google.com>.
+1 on this, having the ability to create timers based on data would make a
bunch of use cases easier to write.

Any thoughts on having a isSet() / read() / setMinimum(timeStamp) type
ability?

On Wed, 23 Oct 2019 at 00:52, Reuven Lax <re...@google.com> wrote:

> Kenn:
> +1 to using TimerFamily instead of TimerId and TimerMap.
>
> Jan:
> This is definitely not just for DSLs. I've definitely seen cases where the
> user wants different timers based on input data, so they cannot be defined
> statically. As a thought experiment: one stated goal of state + timers was
> to provide the low-level tools we use to implement windowing. However to
> implement windowing you need a dynamic set of timers, not just a single
> one. Now most users don't need to reimplement windowing (though we have had
> some users who had that need, when they wanted something slightly different
> than what native Beam windowing provided), however the need for dynamic
> timers is not unheard of.
>
> +1 to allowing dynamic state. However I think this is separate enough from
> timers that it doesn't need to be coupled in this discussion. Dynamic state
> also raises the wrinkle of pipeline compatibility (as you mentioned),
> which I think is a bit less of an issue for dynamic timers.
>
> Allowing a DSL to specify a DoFnSignature does not quite solve this
> problem. The DSL still needs a way to set and process the timers. It also
> does not solve the problem where the timers are based on input data
> elements, so cannot be known at pipeline construction time. However what
> might be more important is statically defining the timer families, and a
> DSL could do this by specifying a DoFnSignature (and something similar
> could be done with state). Also as mentioned above, this is useful to
> normal Beam users as well, and we shouldn't force normal users to start
> dealing with DoFnSignatures and DoFnInvokers.
>
>
>
>
>
>
> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Max,
>>
>> wouldn't that be actually the same as
>>
>> class MyDoFn extends DoFn<String, String> {
>>
>>
>>    @ProcessElement
>>    public void process(
>>        ProcessContext context) {
>>      // "get" would register a new TimerSpec
>>      Timer timer1 = context.getTimer("timer1");
>>      Timer timer2 = context.getTimer("timer2");
>>      timers.set(...);
>>      timers.set(...);
>>    }
>>
>> That is - no need to declare anything? One more concern about that - if
>> we allow registration of timers (or even state) dynamically like that it
>> might be harder to perform validation of pipeline upon upgrades.
>>
>> Jan
>>
>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>> > The idea makes sense to me. I really like that Beam gives upfront
>> > specs for timer and state, but it is not flexible enough for
>> > timer-based libraries or for users which want to dynamically generate
>> > timers.
>> >
>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>> > timer specs from setting actual timers?
>> >
>> > Suggestion:
>> >
>> > class MyDoFn extends DoFn<String, String> {
>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>> >
>> >   @ProcessElement
>> >   public void process(
>> >       @Element String e,
>> >       @TimerMap TimerMap timers)) {
>> >     // "get" would register a new TimerSpec
>> >     Timer timer1 = timers.get("timer1");
>> >     Timer timer2 = timers.get("timer2");
>> >     timers.set(...);
>> >     timers.set(...);
>> >   }
>> >
>> >   // No args for "@OnTimer" => use generic TimerMap
>> >   @OnTimer
>> >   public void onTimer(
>> >       @TimerId String timerFired,
>> >       @Timestamp Instant timerTs,
>> >       @TimerMap TimerMap timers) {
>> >      // Timer firing
>> >      ...
>> >      // Set this timer (or another)
>> >      Timer timer = timers.get(timerFired);
>> >      timer.set(...);
>> >   }
>> > }
>> >
>> > What do you think?
>> >
>> > -Max
>> >
>> > On 22.10.19 10:35, Jan Lukavský wrote:
>> >> Hi Kenn,
>> >>
>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>> >>> This seems extremely useful.
>> >>>
>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>> >>> suggest that the parameter annotation be something other
>> >>> than @TimerId since that annotation is already used for a very
>> >>> similar but different purpose; they are close enough that it is
>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>> >>> keep @TimerId in the parameter list and change the declaration
>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>> >>> clear naming than "map".
>> >>>
>> >>> At the portability level, this API does seem to be pretty close to a
>> >>> noop in terms of the messages that needs to be sent over the Fn API,
>> >>> so it makes sense to loosen the protos. By the time the Fn API is in
>> >>> play, all of our desires to catch errors prior to execution are
>> >>> irrelevant anyhow.
>> >>>
>> >>> On the other hand, I think DSLs have a different & bigger problem
>> >>> than this, in that they want to programmatically adjust all the
>> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
>> >>> another. Certainly some limited DSL use cases are addressed by this,
>> >>> but I wouldn't take that as a primary use case for this feature.
>> >>> Ultimately they are probably better served by being able to
>> >>> explicitly author a DoFnInvoker and provide it to a variant of
>> >>> beam:transforms:ParDo where the do_fn field is a serialized
>> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
>> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
>> >>> That would allow maximum flexibility in utilizing the portability
>> >>> framework.
>> >>
>> >> yes, exactly, but when DSLs are in question, we have to make sure
>> >> that DSLs are not bound to portability - we have to be able to
>> >> translate even in case of "legacy" runners as well. That might
>> >> complicate things a bit maybe.
>> >>
>> >> Jan
>> >>
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
>> >>> <ma...@google.com>> wrote:
>> >>>
>> >>>     BEAM-6857 documents the need for dynamic timer support in the Beam
>> >>>     API. I wanted to make a proposal for what this API would look
>> >>>     like, and how to express it in the portability protos.
>> >>>
>> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>> >>>     statically declare all timers it accesses at compile time. For
>> >>>     example:
>> >>>
>> >>>     class MyDoFn extends DoFn<String, String> {
>> >>>       @TimerId("timer1") TimerSpec timer1 =
>> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>> >>>       @TimerId("timer2") TimerSpec timer2 =
>> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>> >>>
>> >>>       @ProcessElement
>> >>>       public void process(@Element String e, @TimerId("timer1") Timer
>> >>>     timer1, @TimerId("timer2") Timer timer2)) {
>> >>>         timer1.set(...);
>> >>>         timer2.set(...);
>> >>>       }
>> >>>
>> >>>       @OnTimer("timer1") public void onTimer1() { ... }
>> >>>       @OnTimer("timer2") public void onTimer2() { ... }
>> >>>     }
>> >>>
>> >>>     This requires the author of a ParDo to know the full list of
>> >>>     timers ahead of time, which has been problematic in many cases.
>> >>>     One example where it causes issues is for DSLs such as Euphoria or
>> >>>     Scio. DSL authors usually write ParDos to interpret the code
>> >>>     written in the high-level DSL, and so don't know ahead of time the
>> >>>     list of timers needed; alternatives today are quite ugly: physical
>> >>>     code generation or creating a single timer that multiplexes all of
>> >>>     the users logical timers. There are also cases where a ParDo needs
>> >>>     multiple distinct timers, but the set of distinct timers is
>> >>>     controlled by the input data, and therefore not knowable in
>> >>>     advance. The Beam timer API has been insufficient for these use
>> >>> cases.
>> >>>
>> >>>     I propose a new TimerMap construct, which allow a ParDo to
>> >>>     dynamically set named timers. It's use in the Java API would look
>> >>>     as follows:
>> >>>
>> >>>     class MyDoFn extends DoFn<String, String> {
>> >>>       @TimerId("timers") TimerSpec timers =
>> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>> >>>
>> >>>       @ProcessElement
>> >>>       public void process(@Element String e, @TimerId("timers")
>> >>>     TimerMap timer)) {
>> >>>         timers.set("timer1", ...);
>> >>>         timers.set("timer2", ...);
>> >>>       }
>> >>>
>> >>>       @OnTimer("timer") public void onTimer(@TimerId String
>> >>>     timerFired, @Timestamp Instant timerTs) { ... }
>> >>>     }
>> >>>
>> >>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>> >>>     class itself allows dynamically setting multiple timers based on a
>> >>>     String tag argument. Each TimerMap has a single callback which
>> >>>     when called is given the id of the timer that is currently firing.
>> >>>
>> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>> >>>     required if you want to have both processing-time and event-time
>> >>>     timers in the same ParDo). Each TimerMap is its own logical
>> >>>     namespace. i.e. if the user sets timers with the same string tag
>> >>>     on different TimerMap objects the timers will not collide.
>> >>>
>> >>>     Currently the portability protos were written to mirror the Java
>> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>> >>>     suggest that we instead make TimerMap the default for portability,
>> >>>     and model the current behavior on top of timer map. If this proves
>> >>>     problematic for some runners, we could instead introduce a new
>> >>>     TimerSpec proto to represent TimerMap.
>> >>>
>> >>>     Thoughts?
>> >>>
>> >>>     Reuven
>> >>>
>>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Reuven Lax <re...@google.com>.
Kenn:
+1 to using TimerFamily instead of TimerId and TimerMap.

Jan:
This is definitely not just for DSLs. I've definitely seen cases where the
user wants different timers based on input data, so they cannot be defined
statically. As a thought experiment: one stated goal of state + timers was
to provide the low-level tools we use to implement windowing. However to
implement windowing you need a dynamic set of timers, not just a single
one. Now most users don't need to reimplement windowing (though we have had
some users who had that need, when they wanted something slightly different
than what native Beam windowing provided), however the need for dynamic
timers is not unheard of.

+1 to allowing dynamic state. However I think this is separate enough from
timers that it doesn't need to be coupled in this discussion. Dynamic state
also raises the wrinkle of pipeline compatibility (as you mentioned),
which I think is a bit less of an issue for dynamic timers.

Allowing a DSL to specify a DoFnSignature does not quite solve this
problem. The DSL still needs a way to set and process the timers. It also
does not solve the problem where the timers are based on input data
elements, so cannot be known at pipeline construction time. However what
might be more important is statically defining the timer families, and a
DSL could do this by specifying a DoFnSignature (and something similar
could be done with state). Also as mentioned above, this is useful to
normal Beam users as well, and we shouldn't force normal users to start
dealing with DoFnSignatures and DoFnInvokers.






On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Max,
>
> wouldn't that be actually the same as
>
> class MyDoFn extends DoFn<String, String> {
>
>
>    @ProcessElement
>    public void process(
>        ProcessContext context) {
>      // "get" would register a new TimerSpec
>      Timer timer1 = context.getTimer("timer1");
>      Timer timer2 = context.getTimer("timer2");
>      timers.set(...);
>      timers.set(...);
>    }
>
> That is - no need to declare anything? One more concern about that - if
> we allow registration of timers (or even state) dynamically like that it
> might be harder to perform validation of pipeline upon upgrades.
>
> Jan
>
> On 10/22/19 4:47 PM, Maximilian Michels wrote:
> > The idea makes sense to me. I really like that Beam gives upfront
> > specs for timer and state, but it is not flexible enough for
> > timer-based libraries or for users which want to dynamically generate
> > timers.
> >
> > I'm not sure about the proposed API yet. Shouldn't we separate the
> > timer specs from setting actual timers?
> >
> > Suggestion:
> >
> > class MyDoFn extends DoFn<String, String> {
> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
> >
> >   @ProcessElement
> >   public void process(
> >       @Element String e,
> >       @TimerMap TimerMap timers)) {
> >     // "get" would register a new TimerSpec
> >     Timer timer1 = timers.get("timer1");
> >     Timer timer2 = timers.get("timer2");
> >     timers.set(...);
> >     timers.set(...);
> >   }
> >
> >   // No args for "@OnTimer" => use generic TimerMap
> >   @OnTimer
> >   public void onTimer(
> >       @TimerId String timerFired,
> >       @Timestamp Instant timerTs,
> >       @TimerMap TimerMap timers) {
> >      // Timer firing
> >      ...
> >      // Set this timer (or another)
> >      Timer timer = timers.get(timerFired);
> >      timer.set(...);
> >   }
> > }
> >
> > What do you think?
> >
> > -Max
> >
> > On 22.10.19 10:35, Jan Lukavský wrote:
> >> Hi Kenn,
> >>
> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
> >>> This seems extremely useful.
> >>>
> >>> I assume you mean `@OnTimer("timers")` in your example. I would
> >>> suggest that the parameter annotation be something other
> >>> than @TimerId since that annotation is already used for a very
> >>> similar but different purpose; they are close enough that it is
> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
> >>> keep @TimerId in the parameter list and change the declaration
> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
> >>> clear naming than "map".
> >>>
> >>> At the portability level, this API does seem to be pretty close to a
> >>> noop in terms of the messages that needs to be sent over the Fn API,
> >>> so it makes sense to loosen the protos. By the time the Fn API is in
> >>> play, all of our desires to catch errors prior to execution are
> >>> irrelevant anyhow.
> >>>
> >>> On the other hand, I think DSLs have a different & bigger problem
> >>> than this, in that they want to programmatically adjust all the
> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
> >>> another. Certainly some limited DSL use cases are addressed by this,
> >>> but I wouldn't take that as a primary use case for this feature.
> >>> Ultimately they are probably better served by being able to
> >>> explicitly author a DoFnInvoker and provide it to a variant of
> >>> beam:transforms:ParDo where the do_fn field is a serialized
> >>> DoFnInvoker. Now that I think about this, I cannot recall why we
> >>> don't already ship a DoFnSignature & DoFnInvoker as the payload.
> >>> That would allow maximum flexibility in utilizing the portability
> >>> framework.
> >>
> >> yes, exactly, but when DSLs are in question, we have to make sure
> >> that DSLs are not bound to portability - we have to be able to
> >> translate even in case of "legacy" runners as well. That might
> >> complicate things a bit maybe.
> >>
> >> Jan
> >>
> >>>
> >>> Kenn
> >>>
> >>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com
> >>> <ma...@google.com>> wrote:
> >>>
> >>>     BEAM-6857 documents the need for dynamic timer support in the Beam
> >>>     API. I wanted to make a proposal for what this API would look
> >>>     like, and how to express it in the portability protos.
> >>>
> >>>     Background: Today Beam (especially BeamJava) requires a ParDo to
> >>>     statically declare all timers it accesses at compile time. For
> >>>     example:
> >>>
> >>>     class MyDoFn extends DoFn<String, String> {
> >>>       @TimerId("timer1") TimerSpec timer1 =
> >>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
> >>>       @TimerId("timer2") TimerSpec timer2 =
> >>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
> >>>
> >>>       @ProcessElement
> >>>       public void process(@Element String e, @TimerId("timer1") Timer
> >>>     timer1, @TimerId("timer2") Timer timer2)) {
> >>>         timer1.set(...);
> >>>         timer2.set(...);
> >>>       }
> >>>
> >>>       @OnTimer("timer1") public void onTimer1() { ... }
> >>>       @OnTimer("timer2") public void onTimer2() { ... }
> >>>     }
> >>>
> >>>     This requires the author of a ParDo to know the full list of
> >>>     timers ahead of time, which has been problematic in many cases.
> >>>     One example where it causes issues is for DSLs such as Euphoria or
> >>>     Scio. DSL authors usually write ParDos to interpret the code
> >>>     written in the high-level DSL, and so don't know ahead of time the
> >>>     list of timers needed; alternatives today are quite ugly: physical
> >>>     code generation or creating a single timer that multiplexes all of
> >>>     the users logical timers. There are also cases where a ParDo needs
> >>>     multiple distinct timers, but the set of distinct timers is
> >>>     controlled by the input data, and therefore not knowable in
> >>>     advance. The Beam timer API has been insufficient for these use
> >>> cases.
> >>>
> >>>     I propose a new TimerMap construct, which allow a ParDo to
> >>>     dynamically set named timers. It's use in the Java API would look
> >>>     as follows:
> >>>
> >>>     class MyDoFn extends DoFn<String, String> {
> >>>       @TimerId("timers") TimerSpec timers =
> >>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
> >>>
> >>>       @ProcessElement
> >>>       public void process(@Element String e, @TimerId("timers")
> >>>     TimerMap timer)) {
> >>>         timers.set("timer1", ...);
> >>>         timers.set("timer2", ...);
> >>>       }
> >>>
> >>>       @OnTimer("timer") public void onTimer(@TimerId String
> >>>     timerFired, @Timestamp Instant timerTs) { ... }
> >>>     }
> >>>
> >>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
> >>>     class itself allows dynamically setting multiple timers based on a
> >>>     String tag argument. Each TimerMap has a single callback which
> >>>     when called is given the id of the timer that is currently firing.
> >>>
> >>>     It is allowed to have multiple TimerMap objects in a ParDo (and
> >>>     required if you want to have both processing-time and event-time
> >>>     timers in the same ParDo). Each TimerMap is its own logical
> >>>     namespace. i.e. if the user sets timers with the same string tag
> >>>     on different TimerMap objects the timers will not collide.
> >>>
> >>>     Currently the portability protos were written to mirror the Java
> >>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
> >>>     suggest that we instead make TimerMap the default for portability,
> >>>     and model the current behavior on top of timer map. If this proves
> >>>     problematic for some runners, we could instead introduce a new
> >>>     TimerSpec proto to represent TimerMap.
> >>>
> >>>     Thoughts?
> >>>
> >>>     Reuven
> >>>
>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Max,

wouldn't that be actually the same as

class MyDoFn extends DoFn<String, String> {


   @ProcessElement
   public void process(
       ProcessContext context) {
     // "get" would register a new TimerSpec
     Timer timer1 = context.getTimer("timer1");
     Timer timer2 = context.getTimer("timer2");
     timers.set(...);
     timers.set(...);
   }

That is - no need to declare anything? One more concern about that - if 
we allow registration of timers (or even state) dynamically like that it 
might be harder to perform validation of pipeline upon upgrades.

Jan

On 10/22/19 4:47 PM, Maximilian Michels wrote:
> The idea makes sense to me. I really like that Beam gives upfront 
> specs for timer and state, but it is not flexible enough for 
> timer-based libraries or for users which want to dynamically generate 
> timers.
>
> I'm not sure about the proposed API yet. Shouldn't we separate the 
> timer specs from setting actual timers?
>
> Suggestion:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>
>   @ProcessElement
>   public void process(
>       @Element String e,
>       @TimerMap TimerMap timers)) {
>     // "get" would register a new TimerSpec
>     Timer timer1 = timers.get("timer1");
>     Timer timer2 = timers.get("timer2");
>     timers.set(...);
>     timers.set(...);
>   }
>
>   // No args for "@OnTimer" => use generic TimerMap
>   @OnTimer
>   public void onTimer(
>       @TimerId String timerFired,
>       @Timestamp Instant timerTs,
>       @TimerMap TimerMap timers) {
>      // Timer firing
>      ...
>      // Set this timer (or another)
>      Timer timer = timers.get(timerFired);
>      timer.set(...);
>   }
> }
>
> What do you think?
>
> -Max
>
> On 22.10.19 10:35, Jan Lukavský wrote:
>> Hi Kenn,
>>
>> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>>> This seems extremely useful.
>>>
>>> I assume you mean `@OnTimer("timers")` in your example. I would 
>>> suggest that the parameter annotation be something other 
>>> than @TimerId since that annotation is already used for a very 
>>> similar but different purpose; they are close enough that it is 
>>> tempting to pun them, but it is clearer to keep them distinct IMO. 
>>> Perhaps @TimerName or @TimerKey or some such. Alternatively, 
>>> keep @TimerId in the parameter list and change the declaration 
>>> to @TimerFamily("timers"). I think "family" or "group" may be more 
>>> clear naming than "map".
>>>
>>> At the portability level, this API does seem to be pretty close to a 
>>> noop in terms of the messages that needs to be sent over the Fn API, 
>>> so it makes sense to loosen the protos. By the time the Fn API is in 
>>> play, all of our desires to catch errors prior to execution are 
>>> irrelevant anyhow.
>>>
>>> On the other hand, I think DSLs have a different & bigger problem 
>>> than this, in that they want to programmatically adjust all the 
>>> capabilities of a DoFn. Same goes for wrapping one DoFn in 
>>> another. Certainly some limited DSL use cases are addressed by this, 
>>> but I wouldn't take that as a primary use case for this feature. 
>>> Ultimately they are probably better served by being able to 
>>> explicitly author a DoFnInvoker and provide it to a variant of 
>>> beam:transforms:ParDo where the do_fn field is a serialized 
>>> DoFnInvoker. Now that I think about this, I cannot recall why we 
>>> don't already ship a DoFnSignature & DoFnInvoker as the payload. 
>>> That would allow maximum flexibility in utilizing the portability 
>>> framework.
>>
>> yes, exactly, but when DSLs are in question, we have to make sure 
>> that DSLs are not bound to portability - we have to be able to 
>> translate even in case of "legacy" runners as well. That might 
>> complicate things a bit maybe.
>>
>> Jan
>>
>>>
>>> Kenn
>>>
>>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com 
>>> <ma...@google.com>> wrote:
>>>
>>>     BEAM-6857 documents the need for dynamic timer support in the Beam
>>>     API. I wanted to make a proposal for what this API would look
>>>     like, and how to express it in the portability protos.
>>>
>>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>>>     statically declare all timers it accesses at compile time. For
>>>     example:
>>>
>>>     class MyDoFn extends DoFn<String, String> {
>>>       @TimerId("timer1") TimerSpec timer1 =
>>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>>       @TimerId("timer2") TimerSpec timer2 =
>>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>>
>>>       @ProcessElement
>>>       public void process(@Element String e, @TimerId("timer1") Timer
>>>     timer1, @TimerId("timer2") Timer timer2)) {
>>>         timer1.set(...);
>>>         timer2.set(...);
>>>       }
>>>
>>>       @OnTimer("timer1") public void onTimer1() { ... }
>>>       @OnTimer("timer2") public void onTimer2() { ... }
>>>     }
>>>
>>>     This requires the author of a ParDo to know the full list of
>>>     timers ahead of time, which has been problematic in many cases.
>>>     One example where it causes issues is for DSLs such as Euphoria or
>>>     Scio. DSL authors usually write ParDos to interpret the code
>>>     written in the high-level DSL, and so don't know ahead of time the
>>>     list of timers needed; alternatives today are quite ugly: physical
>>>     code generation or creating a single timer that multiplexes all of
>>>     the users logical timers. There are also cases where a ParDo needs
>>>     multiple distinct timers, but the set of distinct timers is
>>>     controlled by the input data, and therefore not knowable in
>>>     advance. The Beam timer API has been insufficient for these use 
>>> cases.
>>>
>>>     I propose a new TimerMap construct, which allow a ParDo to
>>>     dynamically set named timers. It's use in the Java API would look
>>>     as follows:
>>>
>>>     class MyDoFn extends DoFn<String, String> {
>>>       @TimerId("timers") TimerSpec timers =
>>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>>
>>>       @ProcessElement
>>>       public void process(@Element String e, @TimerId("timers")
>>>     TimerMap timer)) {
>>>         timers.set("timer1", ...);
>>>         timers.set("timer2", ...);
>>>       }
>>>
>>>       @OnTimer("timer") public void onTimer(@TimerId String
>>>     timerFired, @Timestamp Instant timerTs) { ... }
>>>     }
>>>
>>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>>>     class itself allows dynamically setting multiple timers based on a
>>>     String tag argument. Each TimerMap has a single callback which
>>>     when called is given the id of the timer that is currently firing.
>>>
>>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>>     required if you want to have both processing-time and event-time
>>>     timers in the same ParDo). Each TimerMap is its own logical
>>>     namespace. i.e. if the user sets timers with the same string tag
>>>     on different TimerMap objects the timers will not collide.
>>>
>>>     Currently the portability protos were written to mirror the Java
>>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>>     suggest that we instead make TimerMap the default for portability,
>>>     and model the current behavior on top of timer map. If this proves
>>>     problematic for some runners, we could instead introduce a new
>>>     TimerSpec proto to represent TimerMap.
>>>
>>>     Thoughts?
>>>
>>>     Reuven
>>>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Maximilian Michels <mx...@apache.org>.
The idea makes sense to me. I really like that Beam gives upfront specs 
for timer and state, but it is not flexible enough for timer-based 
libraries or for users which want to dynamically generate timers.

I'm not sure about the proposed API yet. Shouldn't we separate the timer 
specs from setting actual timers?

Suggestion:

class MyDoFn extends DoFn<String, String> {
   @TimerMap TimerMap timers = TimerSpecs.timerMap();

   @ProcessElement
   public void process(
       @Element String e,
       @TimerMap TimerMap timers)) {
     // "get" would register a new TimerSpec
     Timer timer1 = timers.get("timer1");
     Timer timer2 = timers.get("timer2");
     timers.set(...);
     timers.set(...);
   }

   // No args for "@OnTimer" => use generic TimerMap
   @OnTimer
   public void onTimer(
       @TimerId String timerFired,
       @Timestamp Instant timerTs,
       @TimerMap TimerMap timers) {
      // Timer firing
      ...
      // Set this timer (or another)
      Timer timer = timers.get(timerFired);
      timer.set(...);
   }
}

What do you think?

-Max

On 22.10.19 10:35, Jan Lukavský wrote:
> Hi Kenn,
> 
> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>> This seems extremely useful.
>>
>> I assume you mean `@OnTimer("timers")` in your example. I would 
>> suggest that the parameter annotation be something other than @TimerId 
>> since that annotation is already used for a very similar but different 
>> purpose; they are close enough that it is tempting to pun them, but it 
>> is clearer to keep them distinct IMO. Perhaps @TimerName or @TimerKey 
>> or some such. Alternatively, keep @TimerId in the parameter list and 
>> change the declaration to @TimerFamily("timers"). I think "family" or 
>> "group" may be more clear naming than "map".
>>
>> At the portability level, this API does seem to be pretty close to a 
>> noop in terms of the messages that needs to be sent over the Fn API, 
>> so it makes sense to loosen the protos. By the time the Fn API is in 
>> play, all of our desires to catch errors prior to execution are 
>> irrelevant anyhow.
>>
>> On the other hand, I think DSLs have a different & bigger problem than 
>> this, in that they want to programmatically adjust all the 
>> capabilities of a DoFn. Same goes for wrapping one DoFn in 
>> another. Certainly some limited DSL use cases are addressed by this, 
>> but I wouldn't take that as a primary use case for this feature. 
>> Ultimately they are probably better served by being able to explicitly 
>> author a DoFnInvoker and provide it to a variant of 
>> beam:transforms:ParDo where the do_fn field is a serialized 
>> DoFnInvoker. Now that I think about this, I cannot recall why we don't 
>> already ship a DoFnSignature & DoFnInvoker as the payload. That would 
>> allow maximum flexibility in utilizing the portability framework.
> 
> yes, exactly, but when DSLs are in question, we have to make sure that 
> DSLs are not bound to portability - we have to be able to translate even 
> in case of "legacy" runners as well. That might complicate things a bit 
> maybe.
> 
> Jan
> 
>>
>> Kenn
>>
>> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com 
>> <ma...@google.com>> wrote:
>>
>>     BEAM-6857 documents the need for dynamic timer support in the Beam
>>     API. I wanted to make a proposal for what this API would look
>>     like, and how to express it in the portability protos.
>>
>>     Background: Today Beam (especially BeamJava) requires a ParDo to
>>     statically declare all timers it accesses at compile time. For
>>     example:
>>
>>     class MyDoFn extends DoFn<String, String> {
>>       @TimerId("timer1") TimerSpec timer1 =
>>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>>       @TimerId("timer2") TimerSpec timer2 =
>>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>>
>>       @ProcessElement
>>       public void process(@Element String e, @TimerId("timer1") Timer
>>     timer1, @TimerId("timer2") Timer timer2)) {
>>         timer1.set(...);
>>         timer2.set(...);
>>       }
>>
>>       @OnTimer("timer1") public void onTimer1() { ... }
>>       @OnTimer("timer2") public void onTimer2() { ... }
>>     }
>>
>>     This requires the author of a ParDo to know the full list of
>>     timers ahead of time, which has been problematic in many cases.
>>     One example where it causes issues is for DSLs such as Euphoria or
>>     Scio. DSL authors usually write ParDos to interpret the code
>>     written in the high-level DSL, and so don't know ahead of time the
>>     list of timers needed; alternatives today are quite ugly: physical
>>     code generation or creating a single timer that multiplexes all of
>>     the users logical timers. There are also cases where a ParDo needs
>>     multiple distinct timers, but the set of distinct timers is
>>     controlled by the input data, and therefore not knowable in
>>     advance. The Beam timer API has been insufficient for these use cases.
>>
>>     I propose a new TimerMap construct, which allow a ParDo to
>>     dynamically set named timers. It's use in the Java API would look
>>     as follows:
>>
>>     class MyDoFn extends DoFn<String, String> {
>>       @TimerId("timers") TimerSpec timers =
>>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>>
>>       @ProcessElement
>>       public void process(@Element String e, @TimerId("timers")
>>     TimerMap timer)) {
>>         timers.set("timer1", ...);
>>         timers.set("timer2", ...);
>>       }
>>
>>       @OnTimer("timer") public void onTimer(@TimerId String
>>     timerFired, @Timestamp Instant timerTs) { ... }
>>     }
>>
>>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>>     class itself allows dynamically setting multiple timers based on a
>>     String tag argument. Each TimerMap has a single callback which
>>     when called is given the id of the timer that is currently firing.
>>
>>     It is allowed to have multiple TimerMap objects in a ParDo (and
>>     required if you want to have both processing-time and event-time
>>     timers in the same ParDo). Each TimerMap is its own logical
>>     namespace. i.e. if the user sets timers with the same string tag
>>     on different TimerMap objects the timers will not collide.
>>
>>     Currently the portability protos were written to mirror the Java
>>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>>     suggest that we instead make TimerMap the default for portability,
>>     and model the current behavior on top of timer map. If this proves
>>     problematic for some runners, we could instead introduce a new
>>     TimerSpec proto to represent TimerMap.
>>
>>     Thoughts?
>>
>>     Reuven
>>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Kenn,

On 10/22/19 2:48 AM, Kenneth Knowles wrote:
> This seems extremely useful.
>
> I assume you mean `@OnTimer("timers")` in your example. I would 
> suggest that the parameter annotation be something other than @TimerId 
> since that annotation is already used for a very similar but different 
> purpose; they are close enough that it is tempting to pun them, but it 
> is clearer to keep them distinct IMO. Perhaps @TimerName or @TimerKey 
> or some such. Alternatively, keep @TimerId in the parameter list and 
> change the declaration to @TimerFamily("timers"). I think "family" or 
> "group" may be more clear naming than "map".
>
> At the portability level, this API does seem to be pretty close to a 
> noop in terms of the messages that needs to be sent over the Fn API, 
> so it makes sense to loosen the protos. By the time the Fn API is in 
> play, all of our desires to catch errors prior to execution are 
> irrelevant anyhow.
>
> On the other hand, I think DSLs have a different & bigger problem than 
> this, in that they want to programmatically adjust all the 
> capabilities of a DoFn. Same goes for wrapping one DoFn in 
> another. Certainly some limited DSL use cases are addressed by this, 
> but I wouldn't take that as a primary use case for this feature. 
> Ultimately they are probably better served by being able to explicitly 
> author a DoFnInvoker and provide it to a variant of 
> beam:transforms:ParDo where the do_fn field is a serialized 
> DoFnInvoker. Now that I think about this, I cannot recall why we don't 
> already ship a DoFnSignature & DoFnInvoker as the payload. That would 
> allow maximum flexibility in utilizing the portability framework.

yes, exactly, but when DSLs are in question, we have to make sure that 
DSLs are not bound to portability - we have to be able to translate even 
in case of "legacy" runners as well. That might complicate things a bit 
maybe.

Jan

>
> Kenn
>
> On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <relax@google.com 
> <ma...@google.com>> wrote:
>
>     BEAM-6857 documents the need for dynamic timer support in the Beam
>     API. I wanted to make a proposal for what this API would look
>     like, and how to express it in the portability protos.
>
>     Background: Today Beam (especially BeamJava) requires a ParDo to
>     statically declare all timers it accesses at compile time. For
>     example:
>
>     class MyDoFn extends DoFn<String, String> {
>       @TimerId("timer1") TimerSpec timer1 =
>     TimerSpecs.timer(TimeDomain(EVENT_TIME));
>       @TimerId("timer2") TimerSpec timer2 =
>     TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>
>       @ProcessElement
>       public void process(@Element String e, @TimerId("timer1") Timer
>     timer1, @TimerId("timer2") Timer timer2)) {
>         timer1.set(...);
>         timer2.set(...);
>       }
>
>       @OnTimer("timer1") public void onTimer1() { ... }
>       @OnTimer("timer2") public void onTimer2() { ... }
>     }
>
>     This requires the author of a ParDo to know the full list of
>     timers ahead of time, which has been problematic in many cases.
>     One example where it causes issues is for DSLs such as Euphoria or
>     Scio. DSL authors usually write ParDos to interpret the code
>     written in the high-level DSL, and so don't know ahead of time the
>     list of timers needed; alternatives today are quite ugly: physical
>     code generation or creating a single timer that multiplexes all of
>     the users logical timers. There are also cases where a ParDo needs
>     multiple distinct timers, but the set of distinct timers is
>     controlled by the input data, and therefore not knowable in
>     advance. The Beam timer API has been insufficient for these use cases.
>
>     I propose a new TimerMap construct, which allow a ParDo to
>     dynamically set named timers. It's use in the Java API would look
>     as follows:
>
>     class MyDoFn extends DoFn<String, String> {
>       @TimerId("timers") TimerSpec timers =
>     TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>
>       @ProcessElement
>       public void process(@Element String e, @TimerId("timers")
>     TimerMap timer)) {
>         timers.set("timer1", ...);
>         timers.set("timer2", ...);
>       }
>
>       @OnTimer("timer") public void onTimer(@TimerId String
>     timerFired, @Timestamp Instant timerTs) { ... }
>     }
>
>     There is a new TimerSpec type to specify a TimerMap. The TimerMap
>     class itself allows dynamically setting multiple timers based on a
>     String tag argument. Each TimerMap has a single callback which
>     when called is given the id of the timer that is currently firing.
>
>     It is allowed to have multiple TimerMap objects in a ParDo (and
>     required if you want to have both processing-time and event-time
>     timers in the same ParDo). Each TimerMap is its own logical
>     namespace. i.e. if the user sets timers with the same string tag
>     on different TimerMap objects the timers will not collide.
>
>     Currently the portability protos were written to mirror the Java
>     API, expecting one TimerSpec per timer accessed by the ParDo. I
>     suggest that we instead make TimerMap the default for portability,
>     and model the current behavior on top of timer map. If this proves
>     problematic for some runners, we could instead introduce a new
>     TimerSpec proto to represent TimerMap.
>
>     Thoughts?
>
>     Reuven
>

Re: Proposal: Dynamic timer support (BEAM-6857)

Posted by Kenneth Knowles <ke...@apache.org>.
This seems extremely useful.

I assume you mean `@OnTimer("timers")` in your example. I would suggest
that the parameter annotation be something other than @TimerId since that
annotation is already used for a very similar but different purpose; they
are close enough that it is tempting to pun them, but it is clearer to keep
them distinct IMO. Perhaps @TimerName or @TimerKey or some such.
Alternatively, keep @TimerId in the parameter list and change the
declaration to @TimerFamily("timers"). I think "family" or "group" may be
more clear naming than "map".

At the portability level, this API does seem to be pretty close to a noop
in terms of the messages that needs to be sent over the Fn API, so it makes
sense to loosen the protos. By the time the Fn API is in play, all of our
desires to catch errors prior to execution are irrelevant anyhow.

On the other hand, I think DSLs have a different & bigger problem than
this, in that they want to programmatically adjust all the capabilities of
a DoFn. Same goes for wrapping one DoFn in another. Certainly some limited
DSL use cases are addressed by this, but I wouldn't take that as a primary
use case for this feature. Ultimately they are probably better served by
being able to explicitly author a DoFnInvoker and provide it to a variant
of beam:transforms:ParDo where the do_fn field is a serialized DoFnInvoker.
Now that I think about this, I cannot recall why we don't already ship a
DoFnSignature & DoFnInvoker as the payload. That would allow maximum
flexibility in utilizing the portability framework.

Kenn

On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax <re...@google.com> wrote:

> BEAM-6857 documents the need for dynamic timer support in the Beam API. I
> wanted to make a proposal for what this API would look like, and how to
> express it in the portability protos.
>
> Background: Today Beam (especially BeamJava) requires a ParDo to
> statically declare all timers it accesses at compile time. For example:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerId("timer1") TimerSpec timer1 =
> TimerSpecs.timer(TimeDomain(EVENT_TIME));
>   @TimerId("timer2") TimerSpec timer2 =
> TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerId("timer1") Timer
> timer1, @TimerId("timer2") Timer timer2)) {
>     timer1.set(...);
>     timer2.set(...);
>   }
>
>   @OnTimer("timer1") public void onTimer1() { ... }
>   @OnTimer("timer2") public void onTimer2() { ... }
> }
>
> This requires the author of a ParDo to know the full list of timers ahead
> of time, which has been problematic in many cases. One example where it
> causes issues is for DSLs such as Euphoria or Scio. DSL authors usually
> write ParDos to interpret the code written in the high-level DSL, and so
> don't know ahead of time the list of timers needed; alternatives today are
> quite ugly: physical code generation or creating a single timer that
> multiplexes all of the users logical timers. There are also cases where a
> ParDo needs multiple distinct timers, but the set of distinct timers is
> controlled by the input data, and therefore not knowable in advance. The
> Beam timer API has been insufficient for these use cases.
>
> I propose a new TimerMap construct, which allow a ParDo to dynamically set
> named timers. It's use in the Java API would look as follows:
>
> class MyDoFn extends DoFn<String, String> {
>   @TimerId("timers") TimerSpec timers =
> TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerId("timers") TimerMap
> timer)) {
>     timers.set("timer1", ...);
>     timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
> @Timestamp Instant timerTs) { ... }
> }
>
> There is a new TimerSpec type to specify a TimerMap. The TimerMap class
> itself allows dynamically setting multiple timers based on a String tag
> argument. Each TimerMap has a single callback which when called is given
> the id of the timer that is currently firing.
>
> It is allowed to have multiple TimerMap objects in a ParDo (and required
> if you want to have both processing-time and event-time timers in the same
> ParDo). Each TimerMap is its own logical namespace. i.e. if the user sets
> timers with the same string tag on different TimerMap objects the timers
> will not collide.
>
> Currently the portability protos were written to mirror the Java API,
> expecting one TimerSpec per timer accessed by the ParDo. I suggest that we
> instead make TimerMap the default for portability, and model the current
> behavior on top of timer map. If this proves problematic for some runners,
> we could instead introduce a new TimerSpec proto to represent TimerMap.
>
> Thoughts?
>
> Reuven
>