You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2019/07/01 08:03:54 UTC

Re: [DISCUSS] Thoughts on stateful DoFns in merging windows

What are the issues you see with merging timers? Does it "only" suffer 
from the same issue as early emitting from merging windows (i.e. needs 
retractions to work correctly), or are there some other issues?

On 6/28/19 3:48 PM, Reuven Lax wrote:
> We do have merging state. However merging timers is a bit more 
> awkward. I now tend to think that we're better off providing an 
> onMerge function and let the user handle this.
>
> On Fri, Jun 28, 2019, 11:06 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     Hi,
>
>     during my implementation of @RequiresTimeSortedInput I found out,
>     that
>     current runners do not support stateful DoFns on merging windows [1].
>     The question is why is that? One obvious reason seems to be, that
>     current definition of StateSpec doesn't define a state merge
>     function,
>     which is necessary for merging windows to work. Actually, on Euphoria
>     [2] (originated separately, now merged in Beam) we had only two basic
>     operators (PTransforms) - FlatMap (stateless ParDo) and an
>     operation we
>     called ReduceStateByKey [3] (not merged into Beam, as there were some
>     difficulties, one of which might be the missing support for merging
>     windows). All the others operations could be derived from these
>     two. The
>     ReduceStateByKey (RSBK) operator was keyed operator (shuffle) with
>     following user defined function:
>
>       - state factory (roughly equivalent to declaring a StateSpec)
>
>       - state merge function (State1 + State2 = State3)
>
>       - state update function (State + Value = new State)
>
>       - and state extract function (State -> Output) -- actually called
>     after each element was added to the state
>
>     Now if you look at this, this is essentially both Beam's Combine
>     operator and stateful ParDo. Moreove, GroupByKey is just RSBK with
>     BagState and append merge function. So, a question that come to mind,
>     what would happen if we add state merge function to StateSpec? I
>     think
>     it can have the following benefits:
>
>       - Both Combine and GroupByKey can become derived operators (this
>     is no
>     breaking change, as runners are always free to provide their
>     override to
>     any PTransform)
>
>       - in batch, stateful ParDo can now be implemented more efficiently,
>     using Combine operation (as long, as doesn't
>     @RequiresTimeSortedInput,
>     which is my favourite :))
>
>       - even in stream a combining approach to stateful pardo would be
>     possible (provided trigger would be AfterWatermark with no early
>     firings, and there will be no user timers)
>
>       - there is still a problem with merging windows on stateful DoFns,
>     which is early firings in general (that needs retractions, which
>     is what
>     we first hit here [4], and solved by disabling early emitting from
>     merging windows)
>
>     I'd really like to hear any comments on this.
>
>     Jan
>
>     [1]
>     https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71
>
>     [2]
>     https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria
>
>     [3]
>     https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
>
>     [4] https://github.com/seznam/euphoria/issues/43
>

Re: [DISCUSS] Thoughts on stateful DoFns in merging windows

Posted by Kenneth Knowles <ke...@apache.org>.
In actual practice, Combine + GBK are actually implemented using the same
underlying code as user-facing state & timers. So you are very right :-). I
think the use cases are very different, and the requirements to be more
"safe by default" for users.

The way we could do timer merging if we treated it the same as state is to
have the user provide a TimestampCombiner or CombineFn for merging the
timers. I don't really like this. I agree with Reuven that for stateful
DoFn it seems cleaner to just have @OnMerge.

Under the hood, StateInternals has automatic merging for CombiningState and
BagState. I had always thought that eventually we might expose this to user
state. But again I'm not so sure it is the right design. State is not that
useful without timers, and once you write @OnMerge for the timers you
probably can merge the state too, otherwise it will get confusing.

And as a last point, once we have retractions (which are moving again a
little bit, yay!) it is very likely that a stateful DoFn will have a reason
to emit retractions from @OnMerge. So doing anything automatically has even
less benefit.

Kenn

On Tue, Jul 2, 2019 at 1:27 AM Jan Lukavský <je...@seznam.cz> wrote:

> Understood. I think I can put down a design document of API changes that
> would be required to implement this. Unfortunately, I don't have capacity
> to implement this myself (mostly because I actually don't have a use case
> for that). This discussion was meant to be a kind of theoretical exercise
> if we can find benefits of viewing Combine and GroupByKey as a special case
> of stateful DoFns. I think there are two direct consequences of that:
>
>  a) it might be a little easier to implement new runner (as it might
> suffice to implement stateless and stateful ParDos to get "at least
> something up and running")
>
>  b) it opens some optimizations - e.g. using combineByKey in batch to
> implement stateful (unordered) pardo, probably even on streaming it would
> be possible to push some calculations before the shuffle (timers would
> trigger state shuffle and merge downstream)
>
> Maybe we can find even more benefits. Would this be something worth
> exploring? Would anyone be interested in this?
>
> Jan
> On 7/1/19 11:51 PM, Reuven Lax wrote:
>
> The problem is that it's not obvious how to merge timers. For any obvious
> strategy, I think I can imagine a use case where it will be wrong.
>
> I'm leaning to the conclusion that we're much better off providing an
> onMerge callback, and letting the user explicitly handle the merging there.
>
> Reuven
>
> On Mon, Jul 1, 2019 at 1:04 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> What are the issues you see with merging timers? Does it "only" suffer
>> from the same issue as early emitting from merging windows (i.e. needs
>> retractions to work correctly), or are there some other issues?
>> On 6/28/19 3:48 PM, Reuven Lax wrote:
>>
>> We do have merging state. However merging timers is a bit more awkward. I
>> now tend to think that we're better off providing an onMerge function and
>> let the user handle this.
>>
>> On Fri, Jun 28, 2019, 11:06 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> during my implementation of @RequiresTimeSortedInput I found out, that
>>> current runners do not support stateful DoFns on merging windows [1].
>>> The question is why is that? One obvious reason seems to be, that
>>> current definition of StateSpec doesn't define a state merge function,
>>> which is necessary for merging windows to work. Actually, on Euphoria
>>> [2] (originated separately, now merged in Beam) we had only two basic
>>> operators (PTransforms) - FlatMap (stateless ParDo) and an operation we
>>> called ReduceStateByKey [3] (not merged into Beam, as there were some
>>> difficulties, one of which might be the missing support for merging
>>> windows). All the others operations could be derived from these two. The
>>> ReduceStateByKey (RSBK) operator was keyed operator (shuffle) with
>>> following user defined function:
>>>
>>>   - state factory (roughly equivalent to declaring a StateSpec)
>>>
>>>   - state merge function (State1 + State2 = State3)
>>>
>>>   - state update function (State + Value = new State)
>>>
>>>   - and state extract function (State -> Output) -- actually called
>>> after each element was added to the state
>>>
>>> Now if you look at this, this is essentially both Beam's Combine
>>> operator and stateful ParDo. Moreove, GroupByKey is just RSBK with
>>> BagState and append merge function. So, a question that come to mind,
>>> what would happen if we add state merge function to StateSpec? I think
>>> it can have the following benefits:
>>>
>>>   - Both Combine and GroupByKey can become derived operators (this is no
>>> breaking change, as runners are always free to provide their override to
>>> any PTransform)
>>>
>>>   - in batch, stateful ParDo can now be implemented more efficiently,
>>> using Combine operation (as long, as doesn't @RequiresTimeSortedInput,
>>> which is my favourite :))
>>>
>>>   - even in stream a combining approach to stateful pardo would be
>>> possible (provided trigger would be AfterWatermark with no early
>>> firings, and there will be no user timers)
>>>
>>>   - there is still a problem with merging windows on stateful DoFns,
>>> which is early firings in general (that needs retractions, which is what
>>> we first hit here [4], and solved by disabling early emitting from
>>> merging windows)
>>>
>>> I'd really like to hear any comments on this.
>>>
>>> Jan
>>>
>>> [1]
>>>
>>> https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71
>>>
>>> [2]
>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria
>>>
>>> [3]
>>>
>>> https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
>>>
>>> [4] https://github.com/seznam/euphoria/issues/43
>>>
>>>

Re: [DISCUSS] Thoughts on stateful DoFns in merging windows

Posted by Jan Lukavský <je...@seznam.cz>.
Understood. I think I can put down a design document of API changes that 
would be required to implement this. Unfortunately, I don't have 
capacity to implement this myself (mostly because I actually don't have 
a use case for that). This discussion was meant to be a kind of 
theoretical exercise if we can find benefits of viewing Combine and 
GroupByKey as a special case of stateful DoFns. I think there are two 
direct consequences of that:

  a) it might be a little easier to implement new runner (as it might 
suffice to implement stateless and stateful ParDos to get "at least 
something up and running")

  b) it opens some optimizations - e.g. using combineByKey in batch to 
implement stateful (unordered) pardo, probably even on streaming it 
would be possible to push some calculations before the shuffle (timers 
would trigger state shuffle and merge downstream)

Maybe we can find even more benefits. Would this be something worth 
exploring? Would anyone be interested in this?

Jan

On 7/1/19 11:51 PM, Reuven Lax wrote:
> The problem is that it's not obvious how to merge timers. For any 
> obvious strategy, I think I can imagine a use case where it will be 
> wrong.
>
> I'm leaning to the conclusion that we're much better off providing an 
> onMerge callback, and letting the user explicitly handle the merging 
> there.
>
> Reuven
>
> On Mon, Jul 1, 2019 at 1:04 AM Jan Lukavský <je.ik@seznam.cz 
> <ma...@seznam.cz>> wrote:
>
>     What are the issues you see with merging timers? Does it "only"
>     suffer from the same issue as early emitting from merging windows
>     (i.e. needs retractions to work correctly), or are there some
>     other issues?
>
>     On 6/28/19 3:48 PM, Reuven Lax wrote:
>>     We do have merging state. However merging timers is a bit more
>>     awkward. I now tend to think that we're better off providing an
>>     onMerge function and let the user handle this.
>>
>>     On Fri, Jun 28, 2019, 11:06 AM Jan Lukavský <je.ik@seznam.cz
>>     <ma...@seznam.cz>> wrote:
>>
>>         Hi,
>>
>>         during my implementation of @RequiresTimeSortedInput I found
>>         out, that
>>         current runners do not support stateful DoFns on merging
>>         windows [1].
>>         The question is why is that? One obvious reason seems to be,
>>         that
>>         current definition of StateSpec doesn't define a state merge
>>         function,
>>         which is necessary for merging windows to work. Actually, on
>>         Euphoria
>>         [2] (originated separately, now merged in Beam) we had only
>>         two basic
>>         operators (PTransforms) - FlatMap (stateless ParDo) and an
>>         operation we
>>         called ReduceStateByKey [3] (not merged into Beam, as there
>>         were some
>>         difficulties, one of which might be the missing support for
>>         merging
>>         windows). All the others operations could be derived from
>>         these two. The
>>         ReduceStateByKey (RSBK) operator was keyed operator (shuffle)
>>         with
>>         following user defined function:
>>
>>           - state factory (roughly equivalent to declaring a StateSpec)
>>
>>           - state merge function (State1 + State2 = State3)
>>
>>           - state update function (State + Value = new State)
>>
>>           - and state extract function (State -> Output) -- actually
>>         called
>>         after each element was added to the state
>>
>>         Now if you look at this, this is essentially both Beam's Combine
>>         operator and stateful ParDo. Moreove, GroupByKey is just RSBK
>>         with
>>         BagState and append merge function. So, a question that come
>>         to mind,
>>         what would happen if we add state merge function to
>>         StateSpec? I think
>>         it can have the following benefits:
>>
>>           - Both Combine and GroupByKey can become derived operators
>>         (this is no
>>         breaking change, as runners are always free to provide their
>>         override to
>>         any PTransform)
>>
>>           - in batch, stateful ParDo can now be implemented more
>>         efficiently,
>>         using Combine operation (as long, as doesn't
>>         @RequiresTimeSortedInput,
>>         which is my favourite :))
>>
>>           - even in stream a combining approach to stateful pardo
>>         would be
>>         possible (provided trigger would be AfterWatermark with no early
>>         firings, and there will be no user timers)
>>
>>           - there is still a problem with merging windows on stateful
>>         DoFns,
>>         which is early firings in general (that needs retractions,
>>         which is what
>>         we first hit here [4], and solved by disabling early emitting
>>         from
>>         merging windows)
>>
>>         I'd really like to hear any comments on this.
>>
>>         Jan
>>
>>         [1]
>>         https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71
>>
>>         [2]
>>         https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria
>>
>>         [3]
>>         https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
>>
>>         [4] https://github.com/seznam/euphoria/issues/43
>>

Re: [DISCUSS] Thoughts on stateful DoFns in merging windows

Posted by Reuven Lax <re...@google.com>.
The problem is that it's not obvious how to merge timers. For any obvious
strategy, I think I can imagine a use case where it will be wrong.

I'm leaning to the conclusion that we're much better off providing an
onMerge callback, and letting the user explicitly handle the merging there.

Reuven

On Mon, Jul 1, 2019 at 1:04 AM Jan Lukavský <je...@seznam.cz> wrote:

> What are the issues you see with merging timers? Does it "only" suffer
> from the same issue as early emitting from merging windows (i.e. needs
> retractions to work correctly), or are there some other issues?
> On 6/28/19 3:48 PM, Reuven Lax wrote:
>
> We do have merging state. However merging timers is a bit more awkward. I
> now tend to think that we're better off providing an onMerge function and
> let the user handle this.
>
> On Fri, Jun 28, 2019, 11:06 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> during my implementation of @RequiresTimeSortedInput I found out, that
>> current runners do not support stateful DoFns on merging windows [1].
>> The question is why is that? One obvious reason seems to be, that
>> current definition of StateSpec doesn't define a state merge function,
>> which is necessary for merging windows to work. Actually, on Euphoria
>> [2] (originated separately, now merged in Beam) we had only two basic
>> operators (PTransforms) - FlatMap (stateless ParDo) and an operation we
>> called ReduceStateByKey [3] (not merged into Beam, as there were some
>> difficulties, one of which might be the missing support for merging
>> windows). All the others operations could be derived from these two. The
>> ReduceStateByKey (RSBK) operator was keyed operator (shuffle) with
>> following user defined function:
>>
>>   - state factory (roughly equivalent to declaring a StateSpec)
>>
>>   - state merge function (State1 + State2 = State3)
>>
>>   - state update function (State + Value = new State)
>>
>>   - and state extract function (State -> Output) -- actually called
>> after each element was added to the state
>>
>> Now if you look at this, this is essentially both Beam's Combine
>> operator and stateful ParDo. Moreove, GroupByKey is just RSBK with
>> BagState and append merge function. So, a question that come to mind,
>> what would happen if we add state merge function to StateSpec? I think
>> it can have the following benefits:
>>
>>   - Both Combine and GroupByKey can become derived operators (this is no
>> breaking change, as runners are always free to provide their override to
>> any PTransform)
>>
>>   - in batch, stateful ParDo can now be implemented more efficiently,
>> using Combine operation (as long, as doesn't @RequiresTimeSortedInput,
>> which is my favourite :))
>>
>>   - even in stream a combining approach to stateful pardo would be
>> possible (provided trigger would be AfterWatermark with no early
>> firings, and there will be no user timers)
>>
>>   - there is still a problem with merging windows on stateful DoFns,
>> which is early firings in general (that needs retractions, which is what
>> we first hit here [4], and solved by disabling early emitting from
>> merging windows)
>>
>> I'd really like to hear any comments on this.
>>
>> Jan
>>
>> [1]
>>
>> https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71
>>
>> [2]
>> https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria
>>
>> [3]
>>
>> https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java
>>
>> [4] https://github.com/seznam/euphoria/issues/43
>>
>>