You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2015/12/10 16:47:24 UTC

[DISCUSS] Improving State/Timers/Windows

Hi All,
I want to discuss some ideas about improving the primitives/operations that Flink offers for user-state, timers and windows and how these concepts can be unified.

It has come up a lot lately that people have very specific requirements regarding the state that they keep and it seems necessary to allows users to set their own custom timers (on processing time and watermark time (event-time)) to do both expiration of state and implementation of custom windowing semantics. While we’re at this, we might also think about cleaning up the state handling a bit.

Let me first describe the status quo, so that we’re all on the same page. There are three types of state:
 - function state: this is the state that you get when a user function implements the Checkpointed interface. it is not partitioned
 - operator state: This is the state that a StreamOperator can snapshot, it is similar to the function state, but for operators. it is not partitioned
 - partitioned state: state that is scoped to the key of the incoming element, in Flink, this is (confusingly) called OperatorState and KvState (internally)

(Operator is the low-level concept, user functions are usually invoked by the operator, for example StreamMap is the operator that handles a MapFunction.)

Function state and operator state is not partitioned, meaning that it becomes difficult when we want to implement dynamic scale-in/scale-out. With partitioned state it can be redistributed when changing the degree of parallelism.

Both stream operators and user functions can have partitioned state, and the namespace is the same, i.e. the state can clash. The partitioned state will stay indefinitely if not manually cleared.

On to timers, operators can register processing-time callbacks, they can react to watermarks to implement event-time callbacks. They have to implement the logic themselves, however. For example, the WindowOperator has custom code to keep track of watermark timers and for reacting to watermarks. User functions have no way of registering timers. Also, timers are not scoped to any key. So if you register a timer while processing an element of a certain key, when the timer fires you don’t know what key was active when registering the timer. This might be necessary for cleaning up state for certain keys, or to trigger processing for a certain key only, for example with session windows of some kind.

Now, on to new stuff. I propose to add a timer facility that can be used by both operators and user functions. Both partitioned state and timers should be aware of keys and if a timer fires the partitioned state should be scoped to the same key that was active when the timer was registered.

One last bit. If we want to also implement the current WindowOperator on top of these generic facilities we need to have a way to scope state not only by key but also by windows (or better, some generic state scope). The reason is, that one key can have several active windows at one point in time and firing timers need to me mapped to the correct window (for example, for sliding windows, or session windows or what have you…).

Happy discussing. :D

Cheers,
Aljoscha



Re: [DISCUSS] Improving State/Timers/Windows

Posted by Paris Carbone <pa...@kth.se>.
+1 to all changes proposed, that is a reasonable step towards incremental snapshots and proper reconfiguration support. What is more interesting though is the actual implementations of the KVState derivatives, I am looking forward to see what you have in mind there. The operator/UDF KV namespace separation is not really a problem. We can just prefix the keys with a specific namespace ID. I would also suggest to abstract a memory management layer between KVstates and backend snapshots so we can plug different caching strategies.


> On 14 Dec 2015, at 11:14, Kostas Tzoumas <kt...@apache.org> wrote:
> 
> I suppose that they can start as sugar and evolve to a different
> implementation.
> 
> I would +1 the name change to KVState, OperatorState is indeed somewhat
> confusing, and it will only get harder to rename later.
> 
> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gy...@gmail.com> wrote:
> 
>> Would the Reducing/Folding states just be some API sugar on top of what we
>> have know (ValueState) or does it have some added functionality (like
>> incremental checkpoints for list states)?
>> 
>> Gyula
>> 
>> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. dec. 14.,
>> H, 11:03):
>> 
>>> While enhancing the state interfaces we would also need to introduce new
>>> types of state. I was thinking of these, for a start:
>>> - ValueState (works like OperatorState works now, i.e. provides methods
>>> to get/set one state value
>>> - ListState, proves methods to add one element to a list of elements and
>>> to iterate over all contained elements
>>> - ReducingState, somewhat similar to value state but combines the added
>>> value to the existing value using a ReduceFunction
>>> - FoldingState, same as above but with fold
>>> 
>>> I think these are necessary to give the system more knowledge about the
>>> semantics of state so that it can handle the state more efficiently.
>> Think
>>> of incremental checkpoints, for example, these are easy to do if you know
>>> that state is a list to which stuff is only appended.
>>>> On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote:
>>>> 
>>>> A lot of this makes sense, but I am not sure about renaming
>>>> "OperatorState". The other name is nicer, but why make users' life hard
>>>> just for a name?
>>>> 
>>>> 
>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>> 
>>>>> Hi Aljoscha,
>>>>> 
>>>>> Thanks for the informative technical description.
>>>>> 
>>>>>> - function state: this is the state that you get when a user function
>>>>> implements the Checkpointed interface. it is not partitioned
>>>>>> - operator state: This is the state that a StreamOperator can
>> snapshot,
>>>>> it is similar to the function state, but for operators. it is not
>>>>> partitioned
>>>>>> - partitioned state: state that is scoped to the key of the incoming
>>>>> element, in Flink, this is (confusingly) called OperatorState and
>>> KvState
>>>>> (internally)
>>>>> 
>>>>> Let's clean that up! Let's rename the OperatorState interface to
>>> KvState.
>>>>> 
>>>>>> Both stream operators and user functions can have partitioned state,
>>> and
>>>>> the namespace is the same, i.e. the state can clash. The partitioned
>>> state
>>>>> will stay indefinitely if not manually cleared.
>>>>> 
>>>>> I suppose operators currently have to take care to use a unique
>>>>> identifier for the state such that it doesn't clash with the user
>>>>> function. Wouldn't be too hard to introduce a scoping here.
>>>>> 
>>>>> Your proposal makes sense. It seems like this is a rather delicate
>>>>> change which improves the flexibility of the streaming API. What is
>>>>> the motivation behind this? I suppose you are thinking of improvements
>>>>> to the session capabilities of the streaming API.
>>>>> 
>>>>>> If we want to also implement the current WindowOperator on top of
>> these
>>>>> generic facilities we need to have a way to scope state not only by
>> key
>>> but
>>>>> also by windows (or better, some generic state scope).
>>>>> 
>>>>> This is currently handled by the WindowOperator itself and would then
>>>>> be delegated to the enhanced state interface? Makes sense if we want
>>>>> to make use of the new state interface. Again, is it just cleaner or
>>>>> does this enable new type of applications?
>>>>> 
>>>>> Cheers,
>>>>> Max
>>>>> 
>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
>> aljoscha@apache.org>
>>>>> wrote:
>>>>>> Hi All,
>>>>>> I want to discuss some ideas about improving the
>> primitives/operations
>>>>> that Flink offers for user-state, timers and windows and how these
>>> concepts
>>>>> can be unified.
>>>>>> 
>>>>>> It has come up a lot lately that people have very specific
>> requirements
>>>>> regarding the state that they keep and it seems necessary to allows
>>> users
>>>>> to set their own custom timers (on processing time and watermark time
>>>>> (event-time)) to do both expiration of state and implementation of
>>> custom
>>>>> windowing semantics. While we’re at this, we might also think about
>>>>> cleaning up the state handling a bit.
>>>>>> 
>>>>>> Let me first describe the status quo, so that we’re all on the same
>>>>> page. There are three types of state:
>>>>>> - function state: this is the state that you get when a user function
>>>>> implements the Checkpointed interface. it is not partitioned
>>>>>> - operator state: This is the state that a StreamOperator can
>> snapshot,
>>>>> it is similar to the function state, but for operators. it is not
>>>>> partitioned
>>>>>> - partitioned state: state that is scoped to the key of the incoming
>>>>> element, in Flink, this is (confusingly) called OperatorState and
>>> KvState
>>>>> (internally)
>>>>>> 
>>>>>> (Operator is the low-level concept, user functions are usually
>> invoked
>>>>> by the operator, for example StreamMap is the operator that handles a
>>>>> MapFunction.)
>>>>>> 
>>>>>> Function state and operator state is not partitioned, meaning that it
>>>>> becomes difficult when we want to implement dynamic
>> scale-in/scale-out.
>>>>> With partitioned state it can be redistributed when changing the
>> degree
>>> of
>>>>> parallelism.
>>>>>> 
>>>>>> Both stream operators and user functions can have partitioned state,
>>> and
>>>>> the namespace is the same, i.e. the state can clash. The partitioned
>>> state
>>>>> will stay indefinitely if not manually cleared.
>>>>>> 
>>>>>> On to timers, operators can register processing-time callbacks, they
>>> can
>>>>> react to watermarks to implement event-time callbacks. They have to
>>>>> implement the logic themselves, however. For example, the
>> WindowOperator
>>>>> has custom code to keep track of watermark timers and for reacting to
>>>>> watermarks. User functions have no way of registering timers. Also,
>>> timers
>>>>> are not scoped to any key. So if you register a timer while processing
>>> an
>>>>> element of a certain key, when the timer fires you don’t know what key
>>> was
>>>>> active when registering the timer. This might be necessary for
>> cleaning
>>> up
>>>>> state for certain keys, or to trigger processing for a certain key
>> only,
>>>>> for example with session windows of some kind.
>>>>>> 
>>>>>> Now, on to new stuff. I propose to add a timer facility that can be
>>> used
>>>>> by both operators and user functions. Both partitioned state and
>> timers
>>>>> should be aware of keys and if a timer fires the partitioned state
>>> should
>>>>> be scoped to the same key that was active when the timer was
>> registered.
>>>>>> 
>>>>>> One last bit. If we want to also implement the current WindowOperator
>>> on
>>>>> top of these generic facilities we need to have a way to scope state
>> not
>>>>> only by key but also by windows (or better, some generic state scope).
>>> The
>>>>> reason is, that one key can have several active windows at one point
>> in
>>>>> time and firing timers need to me mapped to the correct window (for
>>>>> example, for sliding windows, or session windows or what have you…).
>>>>>> 
>>>>>> Happy discussing. :D
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
>> 


Re: [DISCUSS] Improving State/Timers/Windows

Posted by Kostas Tzoumas <kt...@apache.org>.
oh, sorry, I misread. Just my +1 to renaming OperatorState then :-)

On Mon, Dec 14, 2015 at 11:38 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> As I mentioned in my previous mail, I think that OperatorState would need
> be replaced by more specific types of state (ValueState, ListState, …).
>
> > On 14 Dec 2015, at 11:34, Maximilian Michels <mx...@apache.org> wrote:
> >
> >>
> >> On a side not, why would you call it KvState? And what would be called
> >> KvState?
> >
> >
> > The OperatorState interface would be called KvState.
> >
> >
> > On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> Yes, as Kostas said, it would initially nor provide more functionality
> but
> >> it would enable us to add it later.
> >>
> >> On a side not, why would you call it KvState? And what would be called
> >> KvState?
> >>
> >>> On 14 Dec 2015, at 11:14, Kostas Tzoumas <kt...@apache.org> wrote:
> >>>
> >>> I suppose that they can start as sugar and evolve to a different
> >>> implementation.
> >>>
> >>> I would +1 the name change to KVState, OperatorState is indeed somewhat
> >>> confusing, and it will only get harder to rename later.
> >>>
> >>> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gy...@gmail.com>
> >> wrote:
> >>>
> >>>> Would the Reducing/Folding states just be some API sugar on top of
> what
> >> we
> >>>> have know (ValueState) or does it have some added functionality (like
> >>>> incremental checkpoints for list states)?
> >>>>
> >>>> Gyula
> >>>>
> >>>> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. dec.
> >> 14.,
> >>>> H, 11:03):
> >>>>
> >>>>> While enhancing the state interfaces we would also need to introduce
> >> new
> >>>>> types of state. I was thinking of these, for a start:
> >>>>> - ValueState (works like OperatorState works now, i.e. provides
> methods
> >>>>> to get/set one state value
> >>>>> - ListState, proves methods to add one element to a list of elements
> >> and
> >>>>> to iterate over all contained elements
> >>>>> - ReducingState, somewhat similar to value state but combines the
> added
> >>>>> value to the existing value using a ReduceFunction
> >>>>> - FoldingState, same as above but with fold
> >>>>>
> >>>>> I think these are necessary to give the system more knowledge about
> the
> >>>>> semantics of state so that it can handle the state more efficiently.
> >>>> Think
> >>>>> of incremental checkpoints, for example, these are easy to do if you
> >> know
> >>>>> that state is a list to which stuff is only appended.
> >>>>>> On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote:
> >>>>>>
> >>>>>> A lot of this makes sense, but I am not sure about renaming
> >>>>>> "OperatorState". The other name is nicer, but why make users' life
> >> hard
> >>>>>> just for a name?
> >>>>>>
> >>>>>>
> >>>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <
> mxm@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Aljoscha,
> >>>>>>>
> >>>>>>> Thanks for the informative technical description.
> >>>>>>>
> >>>>>>>> - function state: this is the state that you get when a user
> >> function
> >>>>>>> implements the Checkpointed interface. it is not partitioned
> >>>>>>>> - operator state: This is the state that a StreamOperator can
> >>>> snapshot,
> >>>>>>> it is similar to the function state, but for operators. it is not
> >>>>>>> partitioned
> >>>>>>>> - partitioned state: state that is scoped to the key of the
> incoming
> >>>>>>> element, in Flink, this is (confusingly) called OperatorState and
> >>>>> KvState
> >>>>>>> (internally)
> >>>>>>>
> >>>>>>> Let's clean that up! Let's rename the OperatorState interface to
> >>>>> KvState.
> >>>>>>>
> >>>>>>>> Both stream operators and user functions can have partitioned
> state,
> >>>>> and
> >>>>>>> the namespace is the same, i.e. the state can clash. The
> partitioned
> >>>>> state
> >>>>>>> will stay indefinitely if not manually cleared.
> >>>>>>>
> >>>>>>> I suppose operators currently have to take care to use a unique
> >>>>>>> identifier for the state such that it doesn't clash with the user
> >>>>>>> function. Wouldn't be too hard to introduce a scoping here.
> >>>>>>>
> >>>>>>> Your proposal makes sense. It seems like this is a rather delicate
> >>>>>>> change which improves the flexibility of the streaming API. What is
> >>>>>>> the motivation behind this? I suppose you are thinking of
> >> improvements
> >>>>>>> to the session capabilities of the streaming API.
> >>>>>>>
> >>>>>>>> If we want to also implement the current WindowOperator on top of
> >>>> these
> >>>>>>> generic facilities we need to have a way to scope state not only by
> >>>> key
> >>>>> but
> >>>>>>> also by windows (or better, some generic state scope).
> >>>>>>>
> >>>>>>> This is currently handled by the WindowOperator itself and would
> then
> >>>>>>> be delegated to the enhanced state interface? Makes sense if we
> want
> >>>>>>> to make use of the new state interface. Again, is it just cleaner
> or
> >>>>>>> does this enable new type of applications?
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Max
> >>>>>>>
> >>>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
> >>>> aljoscha@apache.org>
> >>>>>>> wrote:
> >>>>>>>> Hi All,
> >>>>>>>> I want to discuss some ideas about improving the
> >>>> primitives/operations
> >>>>>>> that Flink offers for user-state, timers and windows and how these
> >>>>> concepts
> >>>>>>> can be unified.
> >>>>>>>>
> >>>>>>>> It has come up a lot lately that people have very specific
> >>>> requirements
> >>>>>>> regarding the state that they keep and it seems necessary to allows
> >>>>> users
> >>>>>>> to set their own custom timers (on processing time and watermark
> time
> >>>>>>> (event-time)) to do both expiration of state and implementation of
> >>>>> custom
> >>>>>>> windowing semantics. While we’re at this, we might also think about
> >>>>>>> cleaning up the state handling a bit.
> >>>>>>>>
> >>>>>>>> Let me first describe the status quo, so that we’re all on the
> same
> >>>>>>> page. There are three types of state:
> >>>>>>>> - function state: this is the state that you get when a user
> >> function
> >>>>>>> implements the Checkpointed interface. it is not partitioned
> >>>>>>>> - operator state: This is the state that a StreamOperator can
> >>>> snapshot,
> >>>>>>> it is similar to the function state, but for operators. it is not
> >>>>>>> partitioned
> >>>>>>>> - partitioned state: state that is scoped to the key of the
> incoming
> >>>>>>> element, in Flink, this is (confusingly) called OperatorState and
> >>>>> KvState
> >>>>>>> (internally)
> >>>>>>>>
> >>>>>>>> (Operator is the low-level concept, user functions are usually
> >>>> invoked
> >>>>>>> by the operator, for example StreamMap is the operator that
> handles a
> >>>>>>> MapFunction.)
> >>>>>>>>
> >>>>>>>> Function state and operator state is not partitioned, meaning that
> >> it
> >>>>>>> becomes difficult when we want to implement dynamic
> >>>> scale-in/scale-out.
> >>>>>>> With partitioned state it can be redistributed when changing the
> >>>> degree
> >>>>> of
> >>>>>>> parallelism.
> >>>>>>>>
> >>>>>>>> Both stream operators and user functions can have partitioned
> state,
> >>>>> and
> >>>>>>> the namespace is the same, i.e. the state can clash. The
> partitioned
> >>>>> state
> >>>>>>> will stay indefinitely if not manually cleared.
> >>>>>>>>
> >>>>>>>> On to timers, operators can register processing-time callbacks,
> they
> >>>>> can
> >>>>>>> react to watermarks to implement event-time callbacks. They have to
> >>>>>>> implement the logic themselves, however. For example, the
> >>>> WindowOperator
> >>>>>>> has custom code to keep track of watermark timers and for reacting
> to
> >>>>>>> watermarks. User functions have no way of registering timers. Also,
> >>>>> timers
> >>>>>>> are not scoped to any key. So if you register a timer while
> >> processing
> >>>>> an
> >>>>>>> element of a certain key, when the timer fires you don’t know what
> >> key
> >>>>> was
> >>>>>>> active when registering the timer. This might be necessary for
> >>>> cleaning
> >>>>> up
> >>>>>>> state for certain keys, or to trigger processing for a certain key
> >>>> only,
> >>>>>>> for example with session windows of some kind.
> >>>>>>>>
> >>>>>>>> Now, on to new stuff. I propose to add a timer facility that can
> be
> >>>>> used
> >>>>>>> by both operators and user functions. Both partitioned state and
> >>>> timers
> >>>>>>> should be aware of keys and if a timer fires the partitioned state
> >>>>> should
> >>>>>>> be scoped to the same key that was active when the timer was
> >>>> registered.
> >>>>>>>>
> >>>>>>>> One last bit. If we want to also implement the current
> >> WindowOperator
> >>>>> on
> >>>>>>> top of these generic facilities we need to have a way to scope
> state
> >>>> not
> >>>>>>> only by key but also by windows (or better, some generic state
> >> scope).
> >>>>> The
> >>>>>>> reason is, that one key can have several active windows at one
> point
> >>>> in
> >>>>>>> time and firing timers need to me mapped to the correct window (for
> >>>>>>> example, for sliding windows, or session windows or what have
> you…).
> >>>>>>>>
> >>>>>>>> Happy discussing. :D
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>
> >>
>
>

Re: [DISCUSS] Improving State/Timers/Windows

Posted by Aljoscha Krettek <al...@apache.org>.
As I mentioned in my previous mail, I think that OperatorState would need be replaced by more specific types of state (ValueState, ListState, …).

> On 14 Dec 2015, at 11:34, Maximilian Michels <mx...@apache.org> wrote:
> 
>> 
>> On a side not, why would you call it KvState? And what would be called
>> KvState?
> 
> 
> The OperatorState interface would be called KvState.
> 
> 
> On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
> 
>> Yes, as Kostas said, it would initially nor provide more functionality but
>> it would enable us to add it later.
>> 
>> On a side not, why would you call it KvState? And what would be called
>> KvState?
>> 
>>> On 14 Dec 2015, at 11:14, Kostas Tzoumas <kt...@apache.org> wrote:
>>> 
>>> I suppose that they can start as sugar and evolve to a different
>>> implementation.
>>> 
>>> I would +1 the name change to KVState, OperatorState is indeed somewhat
>>> confusing, and it will only get harder to rename later.
>>> 
>>> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gy...@gmail.com>
>> wrote:
>>> 
>>>> Would the Reducing/Folding states just be some API sugar on top of what
>> we
>>>> have know (ValueState) or does it have some added functionality (like
>>>> incremental checkpoints for list states)?
>>>> 
>>>> Gyula
>>>> 
>>>> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. dec.
>> 14.,
>>>> H, 11:03):
>>>> 
>>>>> While enhancing the state interfaces we would also need to introduce
>> new
>>>>> types of state. I was thinking of these, for a start:
>>>>> - ValueState (works like OperatorState works now, i.e. provides methods
>>>>> to get/set one state value
>>>>> - ListState, proves methods to add one element to a list of elements
>> and
>>>>> to iterate over all contained elements
>>>>> - ReducingState, somewhat similar to value state but combines the added
>>>>> value to the existing value using a ReduceFunction
>>>>> - FoldingState, same as above but with fold
>>>>> 
>>>>> I think these are necessary to give the system more knowledge about the
>>>>> semantics of state so that it can handle the state more efficiently.
>>>> Think
>>>>> of incremental checkpoints, for example, these are easy to do if you
>> know
>>>>> that state is a list to which stuff is only appended.
>>>>>> On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote:
>>>>>> 
>>>>>> A lot of this makes sense, but I am not sure about renaming
>>>>>> "OperatorState". The other name is nicer, but why make users' life
>> hard
>>>>>> just for a name?
>>>>>> 
>>>>>> 
>>>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mx...@apache.org>
>>>>> wrote:
>>>>>> 
>>>>>>> Hi Aljoscha,
>>>>>>> 
>>>>>>> Thanks for the informative technical description.
>>>>>>> 
>>>>>>>> - function state: this is the state that you get when a user
>> function
>>>>>>> implements the Checkpointed interface. it is not partitioned
>>>>>>>> - operator state: This is the state that a StreamOperator can
>>>> snapshot,
>>>>>>> it is similar to the function state, but for operators. it is not
>>>>>>> partitioned
>>>>>>>> - partitioned state: state that is scoped to the key of the incoming
>>>>>>> element, in Flink, this is (confusingly) called OperatorState and
>>>>> KvState
>>>>>>> (internally)
>>>>>>> 
>>>>>>> Let's clean that up! Let's rename the OperatorState interface to
>>>>> KvState.
>>>>>>> 
>>>>>>>> Both stream operators and user functions can have partitioned state,
>>>>> and
>>>>>>> the namespace is the same, i.e. the state can clash. The partitioned
>>>>> state
>>>>>>> will stay indefinitely if not manually cleared.
>>>>>>> 
>>>>>>> I suppose operators currently have to take care to use a unique
>>>>>>> identifier for the state such that it doesn't clash with the user
>>>>>>> function. Wouldn't be too hard to introduce a scoping here.
>>>>>>> 
>>>>>>> Your proposal makes sense. It seems like this is a rather delicate
>>>>>>> change which improves the flexibility of the streaming API. What is
>>>>>>> the motivation behind this? I suppose you are thinking of
>> improvements
>>>>>>> to the session capabilities of the streaming API.
>>>>>>> 
>>>>>>>> If we want to also implement the current WindowOperator on top of
>>>> these
>>>>>>> generic facilities we need to have a way to scope state not only by
>>>> key
>>>>> but
>>>>>>> also by windows (or better, some generic state scope).
>>>>>>> 
>>>>>>> This is currently handled by the WindowOperator itself and would then
>>>>>>> be delegated to the enhanced state interface? Makes sense if we want
>>>>>>> to make use of the new state interface. Again, is it just cleaner or
>>>>>>> does this enable new type of applications?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>> 
>>>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
>>>> aljoscha@apache.org>
>>>>>>> wrote:
>>>>>>>> Hi All,
>>>>>>>> I want to discuss some ideas about improving the
>>>> primitives/operations
>>>>>>> that Flink offers for user-state, timers and windows and how these
>>>>> concepts
>>>>>>> can be unified.
>>>>>>>> 
>>>>>>>> It has come up a lot lately that people have very specific
>>>> requirements
>>>>>>> regarding the state that they keep and it seems necessary to allows
>>>>> users
>>>>>>> to set their own custom timers (on processing time and watermark time
>>>>>>> (event-time)) to do both expiration of state and implementation of
>>>>> custom
>>>>>>> windowing semantics. While we’re at this, we might also think about
>>>>>>> cleaning up the state handling a bit.
>>>>>>>> 
>>>>>>>> Let me first describe the status quo, so that we’re all on the same
>>>>>>> page. There are three types of state:
>>>>>>>> - function state: this is the state that you get when a user
>> function
>>>>>>> implements the Checkpointed interface. it is not partitioned
>>>>>>>> - operator state: This is the state that a StreamOperator can
>>>> snapshot,
>>>>>>> it is similar to the function state, but for operators. it is not
>>>>>>> partitioned
>>>>>>>> - partitioned state: state that is scoped to the key of the incoming
>>>>>>> element, in Flink, this is (confusingly) called OperatorState and
>>>>> KvState
>>>>>>> (internally)
>>>>>>>> 
>>>>>>>> (Operator is the low-level concept, user functions are usually
>>>> invoked
>>>>>>> by the operator, for example StreamMap is the operator that handles a
>>>>>>> MapFunction.)
>>>>>>>> 
>>>>>>>> Function state and operator state is not partitioned, meaning that
>> it
>>>>>>> becomes difficult when we want to implement dynamic
>>>> scale-in/scale-out.
>>>>>>> With partitioned state it can be redistributed when changing the
>>>> degree
>>>>> of
>>>>>>> parallelism.
>>>>>>>> 
>>>>>>>> Both stream operators and user functions can have partitioned state,
>>>>> and
>>>>>>> the namespace is the same, i.e. the state can clash. The partitioned
>>>>> state
>>>>>>> will stay indefinitely if not manually cleared.
>>>>>>>> 
>>>>>>>> On to timers, operators can register processing-time callbacks, they
>>>>> can
>>>>>>> react to watermarks to implement event-time callbacks. They have to
>>>>>>> implement the logic themselves, however. For example, the
>>>> WindowOperator
>>>>>>> has custom code to keep track of watermark timers and for reacting to
>>>>>>> watermarks. User functions have no way of registering timers. Also,
>>>>> timers
>>>>>>> are not scoped to any key. So if you register a timer while
>> processing
>>>>> an
>>>>>>> element of a certain key, when the timer fires you don’t know what
>> key
>>>>> was
>>>>>>> active when registering the timer. This might be necessary for
>>>> cleaning
>>>>> up
>>>>>>> state for certain keys, or to trigger processing for a certain key
>>>> only,
>>>>>>> for example with session windows of some kind.
>>>>>>>> 
>>>>>>>> Now, on to new stuff. I propose to add a timer facility that can be
>>>>> used
>>>>>>> by both operators and user functions. Both partitioned state and
>>>> timers
>>>>>>> should be aware of keys and if a timer fires the partitioned state
>>>>> should
>>>>>>> be scoped to the same key that was active when the timer was
>>>> registered.
>>>>>>>> 
>>>>>>>> One last bit. If we want to also implement the current
>> WindowOperator
>>>>> on
>>>>>>> top of these generic facilities we need to have a way to scope state
>>>> not
>>>>>>> only by key but also by windows (or better, some generic state
>> scope).
>>>>> The
>>>>>>> reason is, that one key can have several active windows at one point
>>>> in
>>>>>>> time and firing timers need to me mapped to the correct window (for
>>>>>>> example, for sliding windows, or session windows or what have you…).
>>>>>>>> 
>>>>>>>> Happy discussing. :D
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 
>> 


Re: [DISCUSS] Improving State/Timers/Windows

Posted by Maximilian Michels <mx...@apache.org>.
>
> On a side not, why would you call it KvState? And what would be called
> KvState?


The OperatorState interface would be called KvState.


On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Yes, as Kostas said, it would initially nor provide more functionality but
> it would enable us to add it later.
>
> On a side not, why would you call it KvState? And what would be called
> KvState?
>
> > On 14 Dec 2015, at 11:14, Kostas Tzoumas <kt...@apache.org> wrote:
> >
> > I suppose that they can start as sugar and evolve to a different
> > implementation.
> >
> > I would +1 the name change to KVState, OperatorState is indeed somewhat
> > confusing, and it will only get harder to rename later.
> >
> > On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gy...@gmail.com>
> wrote:
> >
> >> Would the Reducing/Folding states just be some API sugar on top of what
> we
> >> have know (ValueState) or does it have some added functionality (like
> >> incremental checkpoints for list states)?
> >>
> >> Gyula
> >>
> >> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. dec.
> 14.,
> >> H, 11:03):
> >>
> >>> While enhancing the state interfaces we would also need to introduce
> new
> >>> types of state. I was thinking of these, for a start:
> >>> - ValueState (works like OperatorState works now, i.e. provides methods
> >>> to get/set one state value
> >>> - ListState, proves methods to add one element to a list of elements
> and
> >>> to iterate over all contained elements
> >>> - ReducingState, somewhat similar to value state but combines the added
> >>> value to the existing value using a ReduceFunction
> >>> - FoldingState, same as above but with fold
> >>>
> >>> I think these are necessary to give the system more knowledge about the
> >>> semantics of state so that it can handle the state more efficiently.
> >> Think
> >>> of incremental checkpoints, for example, these are easy to do if you
> know
> >>> that state is a list to which stuff is only appended.
> >>>> On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote:
> >>>>
> >>>> A lot of this makes sense, but I am not sure about renaming
> >>>> "OperatorState". The other name is nicer, but why make users' life
> hard
> >>>> just for a name?
> >>>>
> >>>>
> >>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mx...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Hi Aljoscha,
> >>>>>
> >>>>> Thanks for the informative technical description.
> >>>>>
> >>>>>> - function state: this is the state that you get when a user
> function
> >>>>> implements the Checkpointed interface. it is not partitioned
> >>>>>> - operator state: This is the state that a StreamOperator can
> >> snapshot,
> >>>>> it is similar to the function state, but for operators. it is not
> >>>>> partitioned
> >>>>>> - partitioned state: state that is scoped to the key of the incoming
> >>>>> element, in Flink, this is (confusingly) called OperatorState and
> >>> KvState
> >>>>> (internally)
> >>>>>
> >>>>> Let's clean that up! Let's rename the OperatorState interface to
> >>> KvState.
> >>>>>
> >>>>>> Both stream operators and user functions can have partitioned state,
> >>> and
> >>>>> the namespace is the same, i.e. the state can clash. The partitioned
> >>> state
> >>>>> will stay indefinitely if not manually cleared.
> >>>>>
> >>>>> I suppose operators currently have to take care to use a unique
> >>>>> identifier for the state such that it doesn't clash with the user
> >>>>> function. Wouldn't be too hard to introduce a scoping here.
> >>>>>
> >>>>> Your proposal makes sense. It seems like this is a rather delicate
> >>>>> change which improves the flexibility of the streaming API. What is
> >>>>> the motivation behind this? I suppose you are thinking of
> improvements
> >>>>> to the session capabilities of the streaming API.
> >>>>>
> >>>>>> If we want to also implement the current WindowOperator on top of
> >> these
> >>>>> generic facilities we need to have a way to scope state not only by
> >> key
> >>> but
> >>>>> also by windows (or better, some generic state scope).
> >>>>>
> >>>>> This is currently handled by the WindowOperator itself and would then
> >>>>> be delegated to the enhanced state interface? Makes sense if we want
> >>>>> to make use of the new state interface. Again, is it just cleaner or
> >>>>> does this enable new type of applications?
> >>>>>
> >>>>> Cheers,
> >>>>> Max
> >>>>>
> >>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
> >> aljoscha@apache.org>
> >>>>> wrote:
> >>>>>> Hi All,
> >>>>>> I want to discuss some ideas about improving the
> >> primitives/operations
> >>>>> that Flink offers for user-state, timers and windows and how these
> >>> concepts
> >>>>> can be unified.
> >>>>>>
> >>>>>> It has come up a lot lately that people have very specific
> >> requirements
> >>>>> regarding the state that they keep and it seems necessary to allows
> >>> users
> >>>>> to set their own custom timers (on processing time and watermark time
> >>>>> (event-time)) to do both expiration of state and implementation of
> >>> custom
> >>>>> windowing semantics. While we’re at this, we might also think about
> >>>>> cleaning up the state handling a bit.
> >>>>>>
> >>>>>> Let me first describe the status quo, so that we’re all on the same
> >>>>> page. There are three types of state:
> >>>>>> - function state: this is the state that you get when a user
> function
> >>>>> implements the Checkpointed interface. it is not partitioned
> >>>>>> - operator state: This is the state that a StreamOperator can
> >> snapshot,
> >>>>> it is similar to the function state, but for operators. it is not
> >>>>> partitioned
> >>>>>> - partitioned state: state that is scoped to the key of the incoming
> >>>>> element, in Flink, this is (confusingly) called OperatorState and
> >>> KvState
> >>>>> (internally)
> >>>>>>
> >>>>>> (Operator is the low-level concept, user functions are usually
> >> invoked
> >>>>> by the operator, for example StreamMap is the operator that handles a
> >>>>> MapFunction.)
> >>>>>>
> >>>>>> Function state and operator state is not partitioned, meaning that
> it
> >>>>> becomes difficult when we want to implement dynamic
> >> scale-in/scale-out.
> >>>>> With partitioned state it can be redistributed when changing the
> >> degree
> >>> of
> >>>>> parallelism.
> >>>>>>
> >>>>>> Both stream operators and user functions can have partitioned state,
> >>> and
> >>>>> the namespace is the same, i.e. the state can clash. The partitioned
> >>> state
> >>>>> will stay indefinitely if not manually cleared.
> >>>>>>
> >>>>>> On to timers, operators can register processing-time callbacks, they
> >>> can
> >>>>> react to watermarks to implement event-time callbacks. They have to
> >>>>> implement the logic themselves, however. For example, the
> >> WindowOperator
> >>>>> has custom code to keep track of watermark timers and for reacting to
> >>>>> watermarks. User functions have no way of registering timers. Also,
> >>> timers
> >>>>> are not scoped to any key. So if you register a timer while
> processing
> >>> an
> >>>>> element of a certain key, when the timer fires you don’t know what
> key
> >>> was
> >>>>> active when registering the timer. This might be necessary for
> >> cleaning
> >>> up
> >>>>> state for certain keys, or to trigger processing for a certain key
> >> only,
> >>>>> for example with session windows of some kind.
> >>>>>>
> >>>>>> Now, on to new stuff. I propose to add a timer facility that can be
> >>> used
> >>>>> by both operators and user functions. Both partitioned state and
> >> timers
> >>>>> should be aware of keys and if a timer fires the partitioned state
> >>> should
> >>>>> be scoped to the same key that was active when the timer was
> >> registered.
> >>>>>>
> >>>>>> One last bit. If we want to also implement the current
> WindowOperator
> >>> on
> >>>>> top of these generic facilities we need to have a way to scope state
> >> not
> >>>>> only by key but also by windows (or better, some generic state
> scope).
> >>> The
> >>>>> reason is, that one key can have several active windows at one point
> >> in
> >>>>> time and firing timers need to me mapped to the correct window (for
> >>>>> example, for sliding windows, or session windows or what have you…).
> >>>>>>
> >>>>>> Happy discussing. :D
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Aljoscha
> >>>>>>
> >>>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Re: [DISCUSS] Improving State/Timers/Windows

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, as Kostas said, it would initially nor provide more functionality but it would enable us to add it later.

On a side not, why would you call it KvState? And what would be called KvState?

> On 14 Dec 2015, at 11:14, Kostas Tzoumas <kt...@apache.org> wrote:
> 
> I suppose that they can start as sugar and evolve to a different
> implementation.
> 
> I would +1 the name change to KVState, OperatorState is indeed somewhat
> confusing, and it will only get harder to rename later.
> 
> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gy...@gmail.com> wrote:
> 
>> Would the Reducing/Folding states just be some API sugar on top of what we
>> have know (ValueState) or does it have some added functionality (like
>> incremental checkpoints for list states)?
>> 
>> Gyula
>> 
>> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. dec. 14.,
>> H, 11:03):
>> 
>>> While enhancing the state interfaces we would also need to introduce new
>>> types of state. I was thinking of these, for a start:
>>> - ValueState (works like OperatorState works now, i.e. provides methods
>>> to get/set one state value
>>> - ListState, proves methods to add one element to a list of elements and
>>> to iterate over all contained elements
>>> - ReducingState, somewhat similar to value state but combines the added
>>> value to the existing value using a ReduceFunction
>>> - FoldingState, same as above but with fold
>>> 
>>> I think these are necessary to give the system more knowledge about the
>>> semantics of state so that it can handle the state more efficiently.
>> Think
>>> of incremental checkpoints, for example, these are easy to do if you know
>>> that state is a list to which stuff is only appended.
>>>> On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote:
>>>> 
>>>> A lot of this makes sense, but I am not sure about renaming
>>>> "OperatorState". The other name is nicer, but why make users' life hard
>>>> just for a name?
>>>> 
>>>> 
>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mx...@apache.org>
>>> wrote:
>>>> 
>>>>> Hi Aljoscha,
>>>>> 
>>>>> Thanks for the informative technical description.
>>>>> 
>>>>>> - function state: this is the state that you get when a user function
>>>>> implements the Checkpointed interface. it is not partitioned
>>>>>> - operator state: This is the state that a StreamOperator can
>> snapshot,
>>>>> it is similar to the function state, but for operators. it is not
>>>>> partitioned
>>>>>> - partitioned state: state that is scoped to the key of the incoming
>>>>> element, in Flink, this is (confusingly) called OperatorState and
>>> KvState
>>>>> (internally)
>>>>> 
>>>>> Let's clean that up! Let's rename the OperatorState interface to
>>> KvState.
>>>>> 
>>>>>> Both stream operators and user functions can have partitioned state,
>>> and
>>>>> the namespace is the same, i.e. the state can clash. The partitioned
>>> state
>>>>> will stay indefinitely if not manually cleared.
>>>>> 
>>>>> I suppose operators currently have to take care to use a unique
>>>>> identifier for the state such that it doesn't clash with the user
>>>>> function. Wouldn't be too hard to introduce a scoping here.
>>>>> 
>>>>> Your proposal makes sense. It seems like this is a rather delicate
>>>>> change which improves the flexibility of the streaming API. What is
>>>>> the motivation behind this? I suppose you are thinking of improvements
>>>>> to the session capabilities of the streaming API.
>>>>> 
>>>>>> If we want to also implement the current WindowOperator on top of
>> these
>>>>> generic facilities we need to have a way to scope state not only by
>> key
>>> but
>>>>> also by windows (or better, some generic state scope).
>>>>> 
>>>>> This is currently handled by the WindowOperator itself and would then
>>>>> be delegated to the enhanced state interface? Makes sense if we want
>>>>> to make use of the new state interface. Again, is it just cleaner or
>>>>> does this enable new type of applications?
>>>>> 
>>>>> Cheers,
>>>>> Max
>>>>> 
>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
>> aljoscha@apache.org>
>>>>> wrote:
>>>>>> Hi All,
>>>>>> I want to discuss some ideas about improving the
>> primitives/operations
>>>>> that Flink offers for user-state, timers and windows and how these
>>> concepts
>>>>> can be unified.
>>>>>> 
>>>>>> It has come up a lot lately that people have very specific
>> requirements
>>>>> regarding the state that they keep and it seems necessary to allows
>>> users
>>>>> to set their own custom timers (on processing time and watermark time
>>>>> (event-time)) to do both expiration of state and implementation of
>>> custom
>>>>> windowing semantics. While we’re at this, we might also think about
>>>>> cleaning up the state handling a bit.
>>>>>> 
>>>>>> Let me first describe the status quo, so that we’re all on the same
>>>>> page. There are three types of state:
>>>>>> - function state: this is the state that you get when a user function
>>>>> implements the Checkpointed interface. it is not partitioned
>>>>>> - operator state: This is the state that a StreamOperator can
>> snapshot,
>>>>> it is similar to the function state, but for operators. it is not
>>>>> partitioned
>>>>>> - partitioned state: state that is scoped to the key of the incoming
>>>>> element, in Flink, this is (confusingly) called OperatorState and
>>> KvState
>>>>> (internally)
>>>>>> 
>>>>>> (Operator is the low-level concept, user functions are usually
>> invoked
>>>>> by the operator, for example StreamMap is the operator that handles a
>>>>> MapFunction.)
>>>>>> 
>>>>>> Function state and operator state is not partitioned, meaning that it
>>>>> becomes difficult when we want to implement dynamic
>> scale-in/scale-out.
>>>>> With partitioned state it can be redistributed when changing the
>> degree
>>> of
>>>>> parallelism.
>>>>>> 
>>>>>> Both stream operators and user functions can have partitioned state,
>>> and
>>>>> the namespace is the same, i.e. the state can clash. The partitioned
>>> state
>>>>> will stay indefinitely if not manually cleared.
>>>>>> 
>>>>>> On to timers, operators can register processing-time callbacks, they
>>> can
>>>>> react to watermarks to implement event-time callbacks. They have to
>>>>> implement the logic themselves, however. For example, the
>> WindowOperator
>>>>> has custom code to keep track of watermark timers and for reacting to
>>>>> watermarks. User functions have no way of registering timers. Also,
>>> timers
>>>>> are not scoped to any key. So if you register a timer while processing
>>> an
>>>>> element of a certain key, when the timer fires you don’t know what key
>>> was
>>>>> active when registering the timer. This might be necessary for
>> cleaning
>>> up
>>>>> state for certain keys, or to trigger processing for a certain key
>> only,
>>>>> for example with session windows of some kind.
>>>>>> 
>>>>>> Now, on to new stuff. I propose to add a timer facility that can be
>>> used
>>>>> by both operators and user functions. Both partitioned state and
>> timers
>>>>> should be aware of keys and if a timer fires the partitioned state
>>> should
>>>>> be scoped to the same key that was active when the timer was
>> registered.
>>>>>> 
>>>>>> One last bit. If we want to also implement the current WindowOperator
>>> on
>>>>> top of these generic facilities we need to have a way to scope state
>> not
>>>>> only by key but also by windows (or better, some generic state scope).
>>> The
>>>>> reason is, that one key can have several active windows at one point
>> in
>>>>> time and firing timers need to me mapped to the correct window (for
>>>>> example, for sliding windows, or session windows or what have you…).
>>>>>> 
>>>>>> Happy discussing. :D
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>> 
>>>>>> 
>>>>> 
>>> 
>>> 
>> 


Re: [DISCUSS] Improving State/Timers/Windows

Posted by Kostas Tzoumas <kt...@apache.org>.
I suppose that they can start as sugar and evolve to a different
implementation.

I would +1 the name change to KVState, OperatorState is indeed somewhat
confusing, and it will only get harder to rename later.

On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gy...@gmail.com> wrote:

> Would the Reducing/Folding states just be some API sugar on top of what we
> have know (ValueState) or does it have some added functionality (like
> incremental checkpoints for list states)?
>
> Gyula
>
> Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. dec. 14.,
> H, 11:03):
>
> > While enhancing the state interfaces we would also need to introduce new
> > types of state. I was thinking of these, for a start:
> >  - ValueState (works like OperatorState works now, i.e. provides methods
> > to get/set one state value
> >  - ListState, proves methods to add one element to a list of elements and
> > to iterate over all contained elements
> >  - ReducingState, somewhat similar to value state but combines the added
> > value to the existing value using a ReduceFunction
> >  - FoldingState, same as above but with fold
> >
> > I think these are necessary to give the system more knowledge about the
> > semantics of state so that it can handle the state more efficiently.
> Think
> > of incremental checkpoints, for example, these are easy to do if you know
> > that state is a list to which stuff is only appended.
> > > On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote:
> > >
> > > A lot of this makes sense, but I am not sure about renaming
> > > "OperatorState". The other name is nicer, but why make users' life hard
> > > just for a name?
> > >
> > >
> > > On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mx...@apache.org>
> > wrote:
> > >
> > >> Hi Aljoscha,
> > >>
> > >> Thanks for the informative technical description.
> > >>
> > >>> - function state: this is the state that you get when a user function
> > >> implements the Checkpointed interface. it is not partitioned
> > >>> - operator state: This is the state that a StreamOperator can
> snapshot,
> > >> it is similar to the function state, but for operators. it is not
> > >> partitioned
> > >>> - partitioned state: state that is scoped to the key of the incoming
> > >> element, in Flink, this is (confusingly) called OperatorState and
> > KvState
> > >> (internally)
> > >>
> > >> Let's clean that up! Let's rename the OperatorState interface to
> > KvState.
> > >>
> > >>> Both stream operators and user functions can have partitioned state,
> > and
> > >> the namespace is the same, i.e. the state can clash. The partitioned
> > state
> > >> will stay indefinitely if not manually cleared.
> > >>
> > >> I suppose operators currently have to take care to use a unique
> > >> identifier for the state such that it doesn't clash with the user
> > >> function. Wouldn't be too hard to introduce a scoping here.
> > >>
> > >> Your proposal makes sense. It seems like this is a rather delicate
> > >> change which improves the flexibility of the streaming API. What is
> > >> the motivation behind this? I suppose you are thinking of improvements
> > >> to the session capabilities of the streaming API.
> > >>
> > >>> If we want to also implement the current WindowOperator on top of
> these
> > >> generic facilities we need to have a way to scope state not only by
> key
> > but
> > >> also by windows (or better, some generic state scope).
> > >>
> > >> This is currently handled by the WindowOperator itself and would then
> > >> be delegated to the enhanced state interface? Makes sense if we want
> > >> to make use of the new state interface. Again, is it just cleaner or
> > >> does this enable new type of applications?
> > >>
> > >> Cheers,
> > >> Max
> > >>
> > >> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <
> aljoscha@apache.org>
> > >> wrote:
> > >>> Hi All,
> > >>> I want to discuss some ideas about improving the
> primitives/operations
> > >> that Flink offers for user-state, timers and windows and how these
> > concepts
> > >> can be unified.
> > >>>
> > >>> It has come up a lot lately that people have very specific
> requirements
> > >> regarding the state that they keep and it seems necessary to allows
> > users
> > >> to set their own custom timers (on processing time and watermark time
> > >> (event-time)) to do both expiration of state and implementation of
> > custom
> > >> windowing semantics. While we’re at this, we might also think about
> > >> cleaning up the state handling a bit.
> > >>>
> > >>> Let me first describe the status quo, so that we’re all on the same
> > >> page. There are three types of state:
> > >>> - function state: this is the state that you get when a user function
> > >> implements the Checkpointed interface. it is not partitioned
> > >>> - operator state: This is the state that a StreamOperator can
> snapshot,
> > >> it is similar to the function state, but for operators. it is not
> > >> partitioned
> > >>> - partitioned state: state that is scoped to the key of the incoming
> > >> element, in Flink, this is (confusingly) called OperatorState and
> > KvState
> > >> (internally)
> > >>>
> > >>> (Operator is the low-level concept, user functions are usually
> invoked
> > >> by the operator, for example StreamMap is the operator that handles a
> > >> MapFunction.)
> > >>>
> > >>> Function state and operator state is not partitioned, meaning that it
> > >> becomes difficult when we want to implement dynamic
> scale-in/scale-out.
> > >> With partitioned state it can be redistributed when changing the
> degree
> > of
> > >> parallelism.
> > >>>
> > >>> Both stream operators and user functions can have partitioned state,
> > and
> > >> the namespace is the same, i.e. the state can clash. The partitioned
> > state
> > >> will stay indefinitely if not manually cleared.
> > >>>
> > >>> On to timers, operators can register processing-time callbacks, they
> > can
> > >> react to watermarks to implement event-time callbacks. They have to
> > >> implement the logic themselves, however. For example, the
> WindowOperator
> > >> has custom code to keep track of watermark timers and for reacting to
> > >> watermarks. User functions have no way of registering timers. Also,
> > timers
> > >> are not scoped to any key. So if you register a timer while processing
> > an
> > >> element of a certain key, when the timer fires you don’t know what key
> > was
> > >> active when registering the timer. This might be necessary for
> cleaning
> > up
> > >> state for certain keys, or to trigger processing for a certain key
> only,
> > >> for example with session windows of some kind.
> > >>>
> > >>> Now, on to new stuff. I propose to add a timer facility that can be
> > used
> > >> by both operators and user functions. Both partitioned state and
> timers
> > >> should be aware of keys and if a timer fires the partitioned state
> > should
> > >> be scoped to the same key that was active when the timer was
> registered.
> > >>>
> > >>> One last bit. If we want to also implement the current WindowOperator
> > on
> > >> top of these generic facilities we need to have a way to scope state
> not
> > >> only by key but also by windows (or better, some generic state scope).
> > The
> > >> reason is, that one key can have several active windows at one point
> in
> > >> time and firing timers need to me mapped to the correct window (for
> > >> example, for sliding windows, or session windows or what have you…).
> > >>>
> > >>> Happy discussing. :D
> > >>>
> > >>> Cheers,
> > >>> Aljoscha
> > >>>
> > >>>
> > >>
> >
> >
>

Re: [DISCUSS] Improving State/Timers/Windows

Posted by Gyula Fóra <gy...@gmail.com>.
Would the Reducing/Folding states just be some API sugar on top of what we
have know (ValueState) or does it have some added functionality (like
incremental checkpoints for list states)?

Gyula

Aljoscha Krettek <al...@apache.org> ezt írta (időpont: 2015. dec. 14.,
H, 11:03):

> While enhancing the state interfaces we would also need to introduce new
> types of state. I was thinking of these, for a start:
>  - ValueState (works like OperatorState works now, i.e. provides methods
> to get/set one state value
>  - ListState, proves methods to add one element to a list of elements and
> to iterate over all contained elements
>  - ReducingState, somewhat similar to value state but combines the added
> value to the existing value using a ReduceFunction
>  - FoldingState, same as above but with fold
>
> I think these are necessary to give the system more knowledge about the
> semantics of state so that it can handle the state more efficiently. Think
> of incremental checkpoints, for example, these are easy to do if you know
> that state is a list to which stuff is only appended.
> > On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote:
> >
> > A lot of this makes sense, but I am not sure about renaming
> > "OperatorState". The other name is nicer, but why make users' life hard
> > just for a name?
> >
> >
> > On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mx...@apache.org>
> wrote:
> >
> >> Hi Aljoscha,
> >>
> >> Thanks for the informative technical description.
> >>
> >>> - function state: this is the state that you get when a user function
> >> implements the Checkpointed interface. it is not partitioned
> >>> - operator state: This is the state that a StreamOperator can snapshot,
> >> it is similar to the function state, but for operators. it is not
> >> partitioned
> >>> - partitioned state: state that is scoped to the key of the incoming
> >> element, in Flink, this is (confusingly) called OperatorState and
> KvState
> >> (internally)
> >>
> >> Let's clean that up! Let's rename the OperatorState interface to
> KvState.
> >>
> >>> Both stream operators and user functions can have partitioned state,
> and
> >> the namespace is the same, i.e. the state can clash. The partitioned
> state
> >> will stay indefinitely if not manually cleared.
> >>
> >> I suppose operators currently have to take care to use a unique
> >> identifier for the state such that it doesn't clash with the user
> >> function. Wouldn't be too hard to introduce a scoping here.
> >>
> >> Your proposal makes sense. It seems like this is a rather delicate
> >> change which improves the flexibility of the streaming API. What is
> >> the motivation behind this? I suppose you are thinking of improvements
> >> to the session capabilities of the streaming API.
> >>
> >>> If we want to also implement the current WindowOperator on top of these
> >> generic facilities we need to have a way to scope state not only by key
> but
> >> also by windows (or better, some generic state scope).
> >>
> >> This is currently handled by the WindowOperator itself and would then
> >> be delegated to the enhanced state interface? Makes sense if we want
> >> to make use of the new state interface. Again, is it just cleaner or
> >> does this enable new type of applications?
> >>
> >> Cheers,
> >> Max
> >>
> >> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <al...@apache.org>
> >> wrote:
> >>> Hi All,
> >>> I want to discuss some ideas about improving the primitives/operations
> >> that Flink offers for user-state, timers and windows and how these
> concepts
> >> can be unified.
> >>>
> >>> It has come up a lot lately that people have very specific requirements
> >> regarding the state that they keep and it seems necessary to allows
> users
> >> to set their own custom timers (on processing time and watermark time
> >> (event-time)) to do both expiration of state and implementation of
> custom
> >> windowing semantics. While we’re at this, we might also think about
> >> cleaning up the state handling a bit.
> >>>
> >>> Let me first describe the status quo, so that we’re all on the same
> >> page. There are three types of state:
> >>> - function state: this is the state that you get when a user function
> >> implements the Checkpointed interface. it is not partitioned
> >>> - operator state: This is the state that a StreamOperator can snapshot,
> >> it is similar to the function state, but for operators. it is not
> >> partitioned
> >>> - partitioned state: state that is scoped to the key of the incoming
> >> element, in Flink, this is (confusingly) called OperatorState and
> KvState
> >> (internally)
> >>>
> >>> (Operator is the low-level concept, user functions are usually invoked
> >> by the operator, for example StreamMap is the operator that handles a
> >> MapFunction.)
> >>>
> >>> Function state and operator state is not partitioned, meaning that it
> >> becomes difficult when we want to implement dynamic scale-in/scale-out.
> >> With partitioned state it can be redistributed when changing the degree
> of
> >> parallelism.
> >>>
> >>> Both stream operators and user functions can have partitioned state,
> and
> >> the namespace is the same, i.e. the state can clash. The partitioned
> state
> >> will stay indefinitely if not manually cleared.
> >>>
> >>> On to timers, operators can register processing-time callbacks, they
> can
> >> react to watermarks to implement event-time callbacks. They have to
> >> implement the logic themselves, however. For example, the WindowOperator
> >> has custom code to keep track of watermark timers and for reacting to
> >> watermarks. User functions have no way of registering timers. Also,
> timers
> >> are not scoped to any key. So if you register a timer while processing
> an
> >> element of a certain key, when the timer fires you don’t know what key
> was
> >> active when registering the timer. This might be necessary for cleaning
> up
> >> state for certain keys, or to trigger processing for a certain key only,
> >> for example with session windows of some kind.
> >>>
> >>> Now, on to new stuff. I propose to add a timer facility that can be
> used
> >> by both operators and user functions. Both partitioned state and timers
> >> should be aware of keys and if a timer fires the partitioned state
> should
> >> be scoped to the same key that was active when the timer was registered.
> >>>
> >>> One last bit. If we want to also implement the current WindowOperator
> on
> >> top of these generic facilities we need to have a way to scope state not
> >> only by key but also by windows (or better, some generic state scope).
> The
> >> reason is, that one key can have several active windows at one point in
> >> time and firing timers need to me mapped to the correct window (for
> >> example, for sliding windows, or session windows or what have you…).
> >>>
> >>> Happy discussing. :D
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >>>
> >>
>
>

Re: [DISCUSS] Improving State/Timers/Windows

Posted by Aljoscha Krettek <al...@apache.org>.
While enhancing the state interfaces we would also need to introduce new types of state. I was thinking of these, for a start:
 - ValueState (works like OperatorState works now, i.e. provides methods to get/set one state value
 - ListState, proves methods to add one element to a list of elements and to iterate over all contained elements
 - ReducingState, somewhat similar to value state but combines the added value to the existing value using a ReduceFunction
 - FoldingState, same as above but with fold

I think these are necessary to give the system more knowledge about the semantics of state so that it can handle the state more efficiently. Think of incremental checkpoints, for example, these are easy to do if you know that state is a list to which stuff is only appended.
> On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote:
> 
> A lot of this makes sense, but I am not sure about renaming
> "OperatorState". The other name is nicer, but why make users' life hard
> just for a name?
> 
> 
> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mx...@apache.org> wrote:
> 
>> Hi Aljoscha,
>> 
>> Thanks for the informative technical description.
>> 
>>> - function state: this is the state that you get when a user function
>> implements the Checkpointed interface. it is not partitioned
>>> - operator state: This is the state that a StreamOperator can snapshot,
>> it is similar to the function state, but for operators. it is not
>> partitioned
>>> - partitioned state: state that is scoped to the key of the incoming
>> element, in Flink, this is (confusingly) called OperatorState and KvState
>> (internally)
>> 
>> Let's clean that up! Let's rename the OperatorState interface to KvState.
>> 
>>> Both stream operators and user functions can have partitioned state, and
>> the namespace is the same, i.e. the state can clash. The partitioned state
>> will stay indefinitely if not manually cleared.
>> 
>> I suppose operators currently have to take care to use a unique
>> identifier for the state such that it doesn't clash with the user
>> function. Wouldn't be too hard to introduce a scoping here.
>> 
>> Your proposal makes sense. It seems like this is a rather delicate
>> change which improves the flexibility of the streaming API. What is
>> the motivation behind this? I suppose you are thinking of improvements
>> to the session capabilities of the streaming API.
>> 
>>> If we want to also implement the current WindowOperator on top of these
>> generic facilities we need to have a way to scope state not only by key but
>> also by windows (or better, some generic state scope).
>> 
>> This is currently handled by the WindowOperator itself and would then
>> be delegated to the enhanced state interface? Makes sense if we want
>> to make use of the new state interface. Again, is it just cleaner or
>> does this enable new type of applications?
>> 
>> Cheers,
>> Max
>> 
>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>> Hi All,
>>> I want to discuss some ideas about improving the primitives/operations
>> that Flink offers for user-state, timers and windows and how these concepts
>> can be unified.
>>> 
>>> It has come up a lot lately that people have very specific requirements
>> regarding the state that they keep and it seems necessary to allows users
>> to set their own custom timers (on processing time and watermark time
>> (event-time)) to do both expiration of state and implementation of custom
>> windowing semantics. While we’re at this, we might also think about
>> cleaning up the state handling a bit.
>>> 
>>> Let me first describe the status quo, so that we’re all on the same
>> page. There are three types of state:
>>> - function state: this is the state that you get when a user function
>> implements the Checkpointed interface. it is not partitioned
>>> - operator state: This is the state that a StreamOperator can snapshot,
>> it is similar to the function state, but for operators. it is not
>> partitioned
>>> - partitioned state: state that is scoped to the key of the incoming
>> element, in Flink, this is (confusingly) called OperatorState and KvState
>> (internally)
>>> 
>>> (Operator is the low-level concept, user functions are usually invoked
>> by the operator, for example StreamMap is the operator that handles a
>> MapFunction.)
>>> 
>>> Function state and operator state is not partitioned, meaning that it
>> becomes difficult when we want to implement dynamic scale-in/scale-out.
>> With partitioned state it can be redistributed when changing the degree of
>> parallelism.
>>> 
>>> Both stream operators and user functions can have partitioned state, and
>> the namespace is the same, i.e. the state can clash. The partitioned state
>> will stay indefinitely if not manually cleared.
>>> 
>>> On to timers, operators can register processing-time callbacks, they can
>> react to watermarks to implement event-time callbacks. They have to
>> implement the logic themselves, however. For example, the WindowOperator
>> has custom code to keep track of watermark timers and for reacting to
>> watermarks. User functions have no way of registering timers. Also, timers
>> are not scoped to any key. So if you register a timer while processing an
>> element of a certain key, when the timer fires you don’t know what key was
>> active when registering the timer. This might be necessary for cleaning up
>> state for certain keys, or to trigger processing for a certain key only,
>> for example with session windows of some kind.
>>> 
>>> Now, on to new stuff. I propose to add a timer facility that can be used
>> by both operators and user functions. Both partitioned state and timers
>> should be aware of keys and if a timer fires the partitioned state should
>> be scoped to the same key that was active when the timer was registered.
>>> 
>>> One last bit. If we want to also implement the current WindowOperator on
>> top of these generic facilities we need to have a way to scope state not
>> only by key but also by windows (or better, some generic state scope). The
>> reason is, that one key can have several active windows at one point in
>> time and firing timers need to me mapped to the correct window (for
>> example, for sliding windows, or session windows or what have you…).
>>> 
>>> Happy discussing. :D
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> 
>> 


Re: [DISCUSS] Improving State/Timers/Windows

Posted by Stephan Ewen <se...@apache.org>.
A lot of this makes sense, but I am not sure about renaming
"OperatorState". The other name is nicer, but why make users' life hard
just for a name?


On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <mx...@apache.org> wrote:

> Hi Aljoscha,
>
> Thanks for the informative technical description.
>
> >  - function state: this is the state that you get when a user function
> implements the Checkpointed interface. it is not partitioned
> >  - operator state: This is the state that a StreamOperator can snapshot,
> it is similar to the function state, but for operators. it is not
> partitioned
> > - partitioned state: state that is scoped to the key of the incoming
> element, in Flink, this is (confusingly) called OperatorState and KvState
> (internally)
>
> Let's clean that up! Let's rename the OperatorState interface to KvState.
>
> > Both stream operators and user functions can have partitioned state, and
> the namespace is the same, i.e. the state can clash. The partitioned state
> will stay indefinitely if not manually cleared.
>
> I suppose operators currently have to take care to use a unique
> identifier for the state such that it doesn't clash with the user
> function. Wouldn't be too hard to introduce a scoping here.
>
> Your proposal makes sense. It seems like this is a rather delicate
> change which improves the flexibility of the streaming API. What is
> the motivation behind this? I suppose you are thinking of improvements
> to the session capabilities of the streaming API.
>
> > If we want to also implement the current WindowOperator on top of these
> generic facilities we need to have a way to scope state not only by key but
> also by windows (or better, some generic state scope).
>
> This is currently handled by the WindowOperator itself and would then
> be delegated to the enhanced state interface? Makes sense if we want
> to make use of the new state interface. Again, is it just cleaner or
> does this enable new type of applications?
>
> Cheers,
> Max
>
> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
> > Hi All,
> > I want to discuss some ideas about improving the primitives/operations
> that Flink offers for user-state, timers and windows and how these concepts
> can be unified.
> >
> > It has come up a lot lately that people have very specific requirements
> regarding the state that they keep and it seems necessary to allows users
> to set their own custom timers (on processing time and watermark time
> (event-time)) to do both expiration of state and implementation of custom
> windowing semantics. While we’re at this, we might also think about
> cleaning up the state handling a bit.
> >
> > Let me first describe the status quo, so that we’re all on the same
> page. There are three types of state:
> >  - function state: this is the state that you get when a user function
> implements the Checkpointed interface. it is not partitioned
> >  - operator state: This is the state that a StreamOperator can snapshot,
> it is similar to the function state, but for operators. it is not
> partitioned
> >  - partitioned state: state that is scoped to the key of the incoming
> element, in Flink, this is (confusingly) called OperatorState and KvState
> (internally)
> >
> > (Operator is the low-level concept, user functions are usually invoked
> by the operator, for example StreamMap is the operator that handles a
> MapFunction.)
> >
> > Function state and operator state is not partitioned, meaning that it
> becomes difficult when we want to implement dynamic scale-in/scale-out.
> With partitioned state it can be redistributed when changing the degree of
> parallelism.
> >
> > Both stream operators and user functions can have partitioned state, and
> the namespace is the same, i.e. the state can clash. The partitioned state
> will stay indefinitely if not manually cleared.
> >
> > On to timers, operators can register processing-time callbacks, they can
> react to watermarks to implement event-time callbacks. They have to
> implement the logic themselves, however. For example, the WindowOperator
> has custom code to keep track of watermark timers and for reacting to
> watermarks. User functions have no way of registering timers. Also, timers
> are not scoped to any key. So if you register a timer while processing an
> element of a certain key, when the timer fires you don’t know what key was
> active when registering the timer. This might be necessary for cleaning up
> state for certain keys, or to trigger processing for a certain key only,
> for example with session windows of some kind.
> >
> > Now, on to new stuff. I propose to add a timer facility that can be used
> by both operators and user functions. Both partitioned state and timers
> should be aware of keys and if a timer fires the partitioned state should
> be scoped to the same key that was active when the timer was registered.
> >
> > One last bit. If we want to also implement the current WindowOperator on
> top of these generic facilities we need to have a way to scope state not
> only by key but also by windows (or better, some generic state scope). The
> reason is, that one key can have several active windows at one point in
> time and firing timers need to me mapped to the correct window (for
> example, for sliding windows, or session windows or what have you…).
> >
> > Happy discussing. :D
> >
> > Cheers,
> > Aljoscha
> >
> >
>

Re: [DISCUSS] Improving State/Timers/Windows

Posted by Maximilian Michels <mx...@apache.org>.
Hi Aljoscha,

Thanks for the informative technical description.

>  - function state: this is the state that you get when a user function implements the Checkpointed interface. it is not partitioned
>  - operator state: This is the state that a StreamOperator can snapshot, it is similar to the function state, but for operators. it is not partitioned
> - partitioned state: state that is scoped to the key of the incoming element, in Flink, this is (confusingly) called OperatorState and KvState (internally)

Let's clean that up! Let's rename the OperatorState interface to KvState.

> Both stream operators and user functions can have partitioned state, and the namespace is the same, i.e. the state can clash. The partitioned state will stay indefinitely if not manually cleared.

I suppose operators currently have to take care to use a unique
identifier for the state such that it doesn't clash with the user
function. Wouldn't be too hard to introduce a scoping here.

Your proposal makes sense. It seems like this is a rather delicate
change which improves the flexibility of the streaming API. What is
the motivation behind this? I suppose you are thinking of improvements
to the session capabilities of the streaming API.

> If we want to also implement the current WindowOperator on top of these generic facilities we need to have a way to scope state not only by key but also by windows (or better, some generic state scope).

This is currently handled by the WindowOperator itself and would then
be delegated to the enhanced state interface? Makes sense if we want
to make use of the new state interface. Again, is it just cleaner or
does this enable new type of applications?

Cheers,
Max

On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi All,
> I want to discuss some ideas about improving the primitives/operations that Flink offers for user-state, timers and windows and how these concepts can be unified.
>
> It has come up a lot lately that people have very specific requirements regarding the state that they keep and it seems necessary to allows users to set their own custom timers (on processing time and watermark time (event-time)) to do both expiration of state and implementation of custom windowing semantics. While we’re at this, we might also think about cleaning up the state handling a bit.
>
> Let me first describe the status quo, so that we’re all on the same page. There are three types of state:
>  - function state: this is the state that you get when a user function implements the Checkpointed interface. it is not partitioned
>  - operator state: This is the state that a StreamOperator can snapshot, it is similar to the function state, but for operators. it is not partitioned
>  - partitioned state: state that is scoped to the key of the incoming element, in Flink, this is (confusingly) called OperatorState and KvState (internally)
>
> (Operator is the low-level concept, user functions are usually invoked by the operator, for example StreamMap is the operator that handles a MapFunction.)
>
> Function state and operator state is not partitioned, meaning that it becomes difficult when we want to implement dynamic scale-in/scale-out. With partitioned state it can be redistributed when changing the degree of parallelism.
>
> Both stream operators and user functions can have partitioned state, and the namespace is the same, i.e. the state can clash. The partitioned state will stay indefinitely if not manually cleared.
>
> On to timers, operators can register processing-time callbacks, they can react to watermarks to implement event-time callbacks. They have to implement the logic themselves, however. For example, the WindowOperator has custom code to keep track of watermark timers and for reacting to watermarks. User functions have no way of registering timers. Also, timers are not scoped to any key. So if you register a timer while processing an element of a certain key, when the timer fires you don’t know what key was active when registering the timer. This might be necessary for cleaning up state for certain keys, or to trigger processing for a certain key only, for example with session windows of some kind.
>
> Now, on to new stuff. I propose to add a timer facility that can be used by both operators and user functions. Both partitioned state and timers should be aware of keys and if a timer fires the partitioned state should be scoped to the same key that was active when the timer was registered.
>
> One last bit. If we want to also implement the current WindowOperator on top of these generic facilities we need to have a way to scope state not only by key but also by windows (or better, some generic state scope). The reason is, that one key can have several active windows at one point in time and firing timers need to me mapped to the correct window (for example, for sliding windows, or session windows or what have you…).
>
> Happy discussing. :D
>
> Cheers,
> Aljoscha
>
>