You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2018/09/07 04:50:38 UTC

Re: [DISCUSS] KIP-362: Dynamic Session Window Support

I cannot follow the example:

>> (10, 10), (15, 3), (19, 5) ...

First, [10,10] is created, second the window is extended to [10,15], and
third [19,19] is created. Why would there be a [15,15]? And why would
(19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
thus [19,19] should be its own window?

> Take a look at another example,
> (13, 3),  (19, 5), (15, 3) ...
> 
> in this case when (15, 3) is received, [13,13] should be retrieved and
> merged to a new window [13, 15], then [19,19] should be updated to [13,
> 19]. Correct?

This example makes sense. However, Guozhang's example was different. The
late even, _reduces_ the gap and this can lead to a window split.
Guozhang's example was

>>> (10, 10), (19, 5), (15, 3) ...

First [10,10] is created, second [10,19] is create (gap is 10, so 10 and
19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
[19,19] must be two windows, ie, original window [10,19] must be split.


Or maybe you have different semantic about gaps are dynamically modified
in mind? It's a little unclear for the KIP itself what semantics dynamic
sessions windows should have.


What is also unclear to me atm is, what use cases you have in mind? The
KIP only says

> the statistical aggregation result, liquidity of the records, 


I am not sure what this means. Can you elaborate?



-Matthias



On 8/30/18 3:32 PM, Lei Chen wrote:
> Hi Guozhang,
> 
> Thanks for reviewing the proposal. I didn't think of out of order events
> and glad that you brought it up.
> 
> In the example you gave,
> 
> (10, 10), (19, 5), (15, 3) ...
> 
> my understanding is that the correct result window should be the same as in
> order events
> 
> (10, 10), (15, 3), (19, 5) ...
> 
> when (15, 3) is received, [15,15] is creatd
> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, 19] is
> created, meanwhile [15,15] is removed
> 
> back to out of order case,
> 
> when (19 ,5) is received, [19, 19] is created
> when (15, 3) is received, in order to generate the same result,
> 1. if late event is later than retention period, it will be dropped
> 2. otherwise, adjacent session windows within gap should be retrieved and
> merged accordingly, in this case [19, 19], and create a new session [15, 19]
> I'm little confused when you said "the window [15, 15] SHOULD actually be
> expired at 18 and hence the next record (19, 5) should be for a new session
> already.". If i understand it correctly, the expiration of the window is
> only checked when next event (19,5) comes and then it should be merged to
> it. [15, 15] will then be closed. Is that also what you meant?
> I cannot think of a case where a window will be split by a late event,
> because if event A and C fall into the same session window, a late event B
> in middle will definitely fall into C's gap as well. IOW, late event will
> only cause window extension, not split.
> 
> Take a look at another example,
> (13, 3),  (19, 5), (15, 3) ...
> 
> in this case when (15, 3) is received, [13,13] should be retrieved and
> merged to a new window [13, 15], then [19,19] should be updated to [13,
> 19]. Correct?
> 
> To be able to achieve that, like you said, the gap needs to be stored for
> sessions. We don't need to save the gap with each event, but only for each
> session window. To avoid upgrading existing session window, how about
> create a new Window type extended from SessionWindow along with a new
> KeySchema?
> 
> What do you think?
> 
> Lei
> 
> 
> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wa...@gmail.com> wrote:
> 
>> Hello Lei,
>>
>> Thanks for the proposal. I've just made a quick pass over it and there is a
>> question I have:
>>
>> The session windows are defined per key, i.e. does that mean that each
>> incoming record of the key can dynamically change the gap of the window?
>> For example, say you have the following record for the same key coming in
>> order, where the first time is the timestamp of the record, and the second
>> value is the extracted gap value:
>>
>> (10, 10), (19, 5), ...
>>
>>
>> When we receive the first record at time 10, the gap is extracted as 10,
>> and hence the window will be expired at 20 if no other record is received.
>> When we receive the second record at time 19, the gap is modified to 5, and
>> hence the window will be expired at 24 if no other record is received.
>>
>>
>> If that's the case, I'm wondering how out-of-order data can be handled
>> then, consider this stream:
>>
>> (10, 10), (19, 5), (15, 3) ...
>>
>> I.e. you received a late record indicating at timestamp 15, which shorten
>> the gap to 3. It means that the window SHOULD actually be expired at 18,
>> and hence the next record (19, 5) should be for a new session already.
>> Today Streams session window implementation does not do "window split", so
>> have you thought about how this can be extended?
>>
>> Also since in your proposal each session window's gap value would be
>> different, we need to store this value along with each record then, how
>> would we store it, and what would be the upgrade path if it is not a
>> compatible change on disk storage etc?
>>
>>
>>
>> Guozhang
>>
>>
>>
>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I created a KIP to add dynamic gap session window support to Kafka
>> Streams
>>> DSL.
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 362%3A+Support+dynamic+gap+session+window
>>>
>>> Please take a look,
>>>
>>> Thanks,
>>> Lei
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 


Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias


On 1/9/19 10:18 PM, Guozhang Wang wrote:
> Hello Lei,
> 
> Just checking what's the current status of this KIP. We have a KIP deadline
> for 2.2 on 24th and wondering if this one may be able to make it.
> 
> 
> Guozhang
> 
> On Sat, Dec 15, 2018 at 1:01 PM Lei Chen <le...@gmail.com> wrote:
> 
>> Sorry for the late reply Matthias. Have been busy with other work recently.
>> I'll restart the discussion and update the KIP accordingly.
>>
>> Lei
>>
>> On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Any update on this KIP?
>>>
>>> On 9/20/18 3:30 PM, Matthias J. Sax wrote:
>>>> Thanks for following up. Very nice examples!
>>>>
>>>> I think, that the window definition for Flink is semantically
>>>> questionable. If there is only a single record, why is the window
>>>> defined as [ts, ts+gap]? To me, this definition is not sound and seems
>>>> to be arbitrary. To define the windows as [ts-gap,ts+gap] as you
>> mention
>>>> would be semantically more useful -- still, I think that defining the
>>>> window as [ts,ts] as we do currently in Kafka Streams is semantically
>>>> the best.
>>>>
>>>> I have the impression, that Flink only defines them differently,
>> because
>>>> it solves the issues in the implementation. (Ie, an implementation
>>>> details leaks into the semantics, what is usually not desired.)
>>>>
>>>> However, I believe that we could change the implementation accordingly.
>>>> We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap])
>> in
>>>> RocksDB, but at API level we return [ts,ts]. This way, we can still
>> find
>>>> all windows we need and provide the same deterministic behavior and
>> keep
>>>> the current window boundaries on the semantic level (there is no need
>> to
>>>> store the window start and/or end time). With this technique, we can
>>>> also implement dynamic session gaps. I think, we would need to store
>> the
>>>> used "gap" for each window, too. But again, this would be an
>>>> implementation detail.
>>>>
>>>> Let's see what others think.
>>>>
>>>> One tricky question we would need to address is, how we can be backward
>>>> compatible. I am currently working on KIP-258 that should help to
>>>> address this backward compatibility issue though.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 9/19/18 5:17 PM, Lei Chen wrote:
>>>>> Thanks Matthias. That makes sense.
>>>>>
>>>>> You're right that symmetric merge is necessary to ensure consistency.
>> On
>>>>> the other hand, I kinda feel it defeats the purpose of dynamic gap,
>>> which
>>>>> is to update the gap from old value to new value. The symmetric merge
>>>>> always honor the larger gap in both direction, rather than honor the
>> gap
>>>>> carried by record with larger timestamp. I wasn't able to find any
>>> semantic
>>>>> definitions w.r.t this particular aspect online, but spent some time
>>>>> looking into other streaming engines like Apache Flink.
>>>>>
>>>>> Apache Flink defines the window differently, that uses (start time,
>>> start
>>>>> time + gap).
>>>>>
>>>>> so our previous example (10, 10), (19,5),(15,3) in Flink's case will
>> be:
>>>>> [10,20]
>>>>> [19,24] => merged to [10,24]
>>>>> [15,18] => merged to [10,24]
>>>>>
>>>>> while example (15,3)(19,5)(10,10) will be
>>>>> [15,18]
>>>>> [19,24] => no merge
>>>>> [10,20] => merged to [10,24]
>>>>>
>>>>> however, since it only records gap in future direction, not past, a
>> late
>>>>> record might not trigger any merge where in symmetric merge it would.
>>>>> (7,2),(10, 10), (19,5),(15,3)
>>>>> [7,9]
>>>>> [10,20]
>>>>> [19,24] => merged to [10,24]
>>>>> [15,18] => merged to [10,24]
>>>>> so at the end
>>>>> two windows [7,9][10,24] are there.
>>>>>
>>>>> As you can see, in Flink, the gap semantic is more toward to the way
>>> that,
>>>>> a gap carried by one record only affects how this record merges with
>>> future
>>>>> records. e.g. a later event (T2, G2) will only be merged with (T1, G1)
>>> is
>>>>> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's
>> call
>>>>> this "forward-merge" way of handling this. I just went thought some
>>> source
>>>>> code and if my understanding is incorrect about Flink's
>> implementation,
>>>>> please correct me.
>>>>>
>>>>> On the other hand, if we want to do symmetric merge in Kafka Streams,
>> we
>>>>> can change the window definition to [start time - gap, start time +
>>> gap].
>>>>> This way the example (7,2),(10, 10), (19,5),(15,3) will be
>>>>> [5,9]
>>>>> [0,20] => merged to [0,20]
>>>>> [14,24] => merged to [0,24]
>>>>> [12,18] => merged to [0,24]
>>>>>
>>>>>  (19,5),(15,3)(7,2),(10, 10) will generate same result
>>>>> [14,24]
>>>>> [12,18] => merged to [12,24]
>>>>> [5,9] => no merge
>>>>> [0,20] => merged to [0,24]
>>>>>
>>>>> Note that symmetric-merge would require us to change the way how Kafka
>>>>> Steams fetch windows now, instead of fetching range from timestamp-gap
>>> to
>>>>> timestamp+gap, we will need to fetch all windows that are not expired
>>> yet.
>>>>> On the other hand, I'm not sure how this will impact the current logic
>>> of
>>>>> how a window is considered as closed, because the window doesn't carry
>>> end
>>>>> timestamp anymore, but end timestamp + gap.
>>>>>
>>>>> So do you guys think forward-merge approach used by Flink makes more
>>> sense
>>>>> in Kafka Streams, or symmetric-merge makes more sense? Both of them
>>> seems
>>>>> to me can give deterministic result.
>>>>>
>>>>> BTW I'll add the use case into original KIP.
>>>>>
>>>>> Lei
>>>>>
>>>>>
>>>>> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <
>> matthias@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Thanks for explaining your understanding. And thanks for providing
>> more
>>>>>> details about the use-case. Maybe you can add this to the KIP?
>>>>>>
>>>>>>
>>>>>> First one general comment. I guess that my and Guozhangs
>> understanding
>>>>>> about gap/close/gracePeriod is the same as yours -- we might not have
>>>>>> use the term precisely correct in previous email.
>>>>>>
>>>>>>
>>>>>> To you semantics of gap in detail:
>>>>>>
>>>>>>> I thought when (15,3) is received, kafka streams look up for
>> neighbor
>>>>>>> record/window that is within the gap
>>>>>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
>>> its
>>>>>> own
>>>>>>> window [10, 10], which is
>>>>>>> out of the gap, so nothing will be found and no merge occurs. Hence
>> we
>>>>>> have
>>>>>>> two windows now in session store,
>>>>>>> [10, 10] and [15, 15] respectively.
>>>>>>
>>>>>> If you have record (10,10), we currently create a window of size
>>>>>> [10,10]. When record (15,3) arrives, your observation that the gap 3
>> is
>>>>>> too small to be merged into [10,10] window -- however, merging is a
>>>>>> symmetric operation and the existing window of [10,10] has a gap of
>> 10
>>>>>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
>>>>>> merged into the existing window resulting in window [10,15].
>>>>>>
>>>>>> If we don't respect the gap both ways, we end up with inconsistencies
>>> if
>>>>>> data is out-of-order. For example, if we use the same input record
>>>>>> (10,10) and (15,3) from above, and it happens that (15,3) is
>> processed
>>>>>> first, when processing out-of-order record (10,10) we would want to
>>>>>> merge both into a single window, too?
>>>>>>
>>>>>> Does this make sense?
>>>>>>
>>>>>> Now the question remains, if two records with different gap parameter
>>>>>> are merged, which gap should we apply for processing/merging future
>>>>>> records into the window? It seems, that we should use the gap
>> parameter
>>>>>> from the record with this larges timestamp. In the example above
>>> (15,3).
>>>>>> We would use gap 3 after merging independent of the order of
>>> processing.
>>>>>>
>>>>>>
>>>>>>> Also another thing worth mentioning is that, the session window
>> object
>>>>>>> created in current kafka streams
>>>>>>> implementation doesn't have gap info, it has start and end, which is
>>> the
>>>>>>> earliest and latest event timestamp
>>>>>>> in that window interval, i.e for (10,10), the session window gets
>>> created
>>>>>>> is [10,10], rather than [10,20]. Just to clarify
>>>>>>> so that it's clear why (10,10) cannot be fetched when looking for
>> gap
>>> of
>>>>>>> (15,3), it's because the end boundary 10 of
>>>>>>> [10,10] is smaller than search boundary [12,18].
>>>>>>
>>>>>> We don't need to store the gap, because the gap is know from the
>> window
>>>>>> definition. The created window size depends on the data that is
>>>>>> contained in the window. I guess one could define it differently,
>> too,
>>>>>> ie, for the (10,10) record, we create a window [0,20] -- not sure if
>> it
>>>>>> makes a big difference in practice though. Note, that creating window
>>>>>> [10,20] would not be correct, because the gap must be applied in both
>>>>>> directions, not just into the future.
>>>>>>
>>>>>> About the second part: the search would not be applied from (15,3) in
>>>>>> range [12,18], but from existing window [10,10] into range [0,20] and
>>> 15
>>>>>> is contained there. This example also shows, that we would need to
>> come
>>>>>> up with a clever way, to identify window [10,10] when processing
>> (15,3)
>>>>>> -- not sure atm how to do this. However, only consider (15,3) would
>>>>>> result in inconsistencies for out-of-order data as pointed out above
>>> and
>>>>>> would not be sufficient.
>>>>>>
>>>>>>
>>>>>> Does this make sense?
>>>>>>
>>>>>>
>>>>>> Or is there another way to define dynamic session gap semantics in a
>>>>>> deterministic way with regard to out-of-order data?
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 9/11/18 4:28 PM, Lei Chen wrote:
>>>>>>> Thanks Matthias and Guozhang for the response.
>>>>>>>
>>>>>>> Seems like our understanding mainly differs in the semantics of gap
>> in
>>>>>>> session windows.
>>>>>>>
>>>>>>> My understanding is that gap is used to merge nearby records
>> together
>>>>>> such
>>>>>>> that no record
>>>>>>> in the merged window has distance later than gap. In Kafka Streams's
>>>>>>> implementation it's
>>>>>>> mainly used to find neighbor records/windows in session store so
>> that
>>>>>>> nearby records can
>>>>>>> be merge. It is NOT used to determine when a window should be
>> closed,
>>>>>> which
>>>>>>> is in
>>>>>>> fact determined by window's grace period.
>>>>>>>
>>>>>>> Guozhang you said "b. When later we received (15, 3), it means that
>>> this
>>>>>>> record ** changed **
>>>>>>> the window gap interval from 10 to 3, and hence we received a new
>>> record
>>>>>> at
>>>>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15
>> +
>>> 3)
>>>>>> if
>>>>>>> we have not received any new data, the window should be closed, i.e.
>>> the
>>>>>>> window is now [10, 18) which includes two records at 10 and 15."
>>>>>>>
>>>>>>> This is different from what i thought will happen.
>>>>>>>
>>>>>>> I thought when (15,3) is received, kafka streams look up for
>> neighbor
>>>>>>> record/window that is within the gap
>>>>>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
>>> its
>>>>>> own
>>>>>>> window [10, 10], which is
>>>>>>> out of the gap, so nothing will be found and no merge occurs. Hence
>> we
>>>>>> have
>>>>>>> two windows now in session store,
>>>>>>> [10, 10] and [15, 15] respectively.
>>>>>>>
>>>>>>> Also another thing worth mentioning is that, the session window
>> object
>>>>>>> created in current kafka streams
>>>>>>> implementation doesn't have gap info, it has start and end, which is
>>> the
>>>>>>> earliest and latest event timestamp
>>>>>>> in that window interval, i.e for (10,10), the session window gets
>>> created
>>>>>>> is [10,10], rather than [10,20]. Just to clarify
>>>>>>> so that it's clear why (10,10) cannot be fetched when looking for
>> gap
>>> of
>>>>>>> (15,3), it's because the end boundary 10 of
>>>>>>> [10,10] is smaller than search boundary [12,18].
>>>>>>>
>>>>>>> Please correct me if my understanding is wrong here.
>>>>>>>
>>>>>>> @Matthias, to answer your use case question, we have an use case
>> where
>>>>>>> asynchronous time series data
>>>>>>> are received in the stream, from different contributors, with
>>> different
>>>>>>> quality and at different pace.
>>>>>>> Inside Kafka Streams, we use state to maintain statistic
>> aggregations
>>> and
>>>>>>> other mathematics model to track
>>>>>>> the liquidity and calculate time decay rate and dynamic gap, so that
>>> at
>>>>>>> runtime, for each contributor we can
>>>>>>> 1. determine how many historical records we should maintain in
>> state.
>>>>>>> 2. for each incoming record, output a record using aggregations from
>>>>>>> *nearby* records from that contributor.
>>>>>>> Why fixed gap session window doesn't work here? Because the
>>> definition of
>>>>>>> "nearby" here is determined by
>>>>>>> several very dynamic factors in our case, it changes not only with
>>>>>>> different hours in a day, but also related to
>>>>>>> other contributors.
>>>>>>> The purpose of this KIP is to suggest a dynamic session window
>>>>>>> implementation so that we can embed such
>>>>>>> dynamic "nearby" calculation capability into kafka streams session
>>>>>> windows
>>>>>>> semantics. Hope it makes sense to you.
>>>>>>>
>>>>>>> Lei
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wa...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Lei,
>>>>>>>>
>>>>>>>> As Matthias mentioned, the key question here is that because of the
>>> late
>>>>>>>> arrivals of records which may indicate a shorter session gap
>>> interval,
>>>>>> some
>>>>>>>> session windows may be "mistakenly" merged and hence need to be
>>> undone
>>>>>> the
>>>>>>>> merge, i.e. to split them again.
>>>>>>>>
>>>>>>>> Back to my example, you are right that the processing result of
>>>>>>>>
>>>>>>>> (10, 10), (19, 5), (15, 3) ..
>>>>>>>>
>>>>>>>> should be the same as the processing result of
>>>>>>>>
>>>>>>>> (10, 10), (15, 3), (19, 5) ..
>>>>>>>>
>>>>>>>> Note that the second value is NOT the window end time, but the
>>> extracted
>>>>>>>> window gap interval, as you suggested in the KIP this value can be
>>>>>>>> dynamically changed
>>>>>>>>
>>>>>>>> a. If you take a look at the second ordering, when we receive (10,
>>> 10)
>>>>>> it
>>>>>>>> means a window starting at 10 is created, and its gap interval is
>> 10,
>>>>>> which
>>>>>>>> means that if by the timestamp of 20 we do not receive any new
>> data,
>>>>>> then
>>>>>>>> the window should be closed, i.e. the window [10, 20).
>>>>>>>>
>>>>>>>> b. When later we received (15, 3), it means that this record **
>>> changed
>>>>>> **
>>>>>>>> the window gap interval from 10 to 3, and hence we received a new
>>>>>> record at
>>>>>>>> 15, with the new window gap of 3, it means that by timestamp 18
>> (15 +
>>>>>> 3) if
>>>>>>>> we have not received any new data, the window should be closed,
>> i.e.
>>> the
>>>>>>>> window is now [10, 18) which includes two records at 10 and 15.
>>>>>>>>
>>>>>>>> c. The third record is received at 19, which is after the window
>>> close
>>>>>> time
>>>>>>>> 18, it means that we should now start a new window starting at 19,
>>> i.e.
>>>>>> the
>>>>>>>> window is [19, 24),
>>>>>>>>
>>>>>>>>
>>>>>>>> BUT, because of the out of ordering, we did not receive (15, 3) in
>>> time,
>>>>>>>> but received (19, 5), it will cause us to mistakenly merge the
>>> window of
>>>>>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
>>>>>> (15, 3)
>>>>>>>> we realized that the previous window should have been ended at 18.
>>>>>>>>
>>>>>>>> Does that make sense to you?
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <
>>> matthias@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I cannot follow the example:
>>>>>>>>>
>>>>>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>>>>>
>>>>>>>>> First, [10,10] is created, second the window is extended to
>> [10,15],
>>>>>> and
>>>>>>>>> third [19,19] is created. Why would there be a [15,15]? And why
>>> would
>>>>>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3)
>> and
>>>>>>>>> thus [19,19] should be its own window?
>>>>>>>>>
>>>>>>>>>> Take a look at another example,
>>>>>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>>>>>
>>>>>>>>>> in this case when (15, 3) is received, [13,13] should be
>> retrieved
>>> and
>>>>>>>>>> merged to a new window [13, 15], then [19,19] should be updated
>> to
>>>>>> [13,
>>>>>>>>>> 19]. Correct?
>>>>>>>>>
>>>>>>>>> This example makes sense. However, Guozhang's example was
>> different.
>>>>>> The
>>>>>>>>> late even, _reduces_ the gap and this can lead to a window split.
>>>>>>>>> Guozhang's example was
>>>>>>>>>
>>>>>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>>>
>>>>>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so
>> 10
>>>>>> and
>>>>>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15]
>>> and
>>>>>>>>> [19,19] must be two windows, ie, original window [10,19] must be
>>> split.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Or maybe you have different semantic about gaps are dynamically
>>>>>> modified
>>>>>>>>> in mind? It's a little unclear for the KIP itself what semantics
>>>>>> dynamic
>>>>>>>>> sessions windows should have.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What is also unclear to me atm is, what use cases you have in
>> mind?
>>> The
>>>>>>>>> KIP only says
>>>>>>>>>
>>>>>>>>>> the statistical aggregation result, liquidity of the records,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am not sure what this means. Can you elaborate?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>
>>>>>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
>>>>>>>> events
>>>>>>>>>> and glad that you brought it up.
>>>>>>>>>>
>>>>>>>>>> In the example you gave,
>>>>>>>>>>
>>>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>>>>
>>>>>>>>>> my understanding is that the correct result window should be the
>>> same
>>>>>>>> as
>>>>>>>>> in
>>>>>>>>>> order events
>>>>>>>>>>
>>>>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>>>>>>
>>>>>>>>>> when (15, 3) is received, [15,15] is creatd
>>>>>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and
>> [15,
>>>>>> 19]
>>>>>>>>> is
>>>>>>>>>> created, meanwhile [15,15] is removed
>>>>>>>>>>
>>>>>>>>>> back to out of order case,
>>>>>>>>>>
>>>>>>>>>> when (19 ,5) is received, [19, 19] is created
>>>>>>>>>> when (15, 3) is received, in order to generate the same result,
>>>>>>>>>> 1. if late event is later than retention period, it will be
>> dropped
>>>>>>>>>> 2. otherwise, adjacent session windows within gap should be
>>> retrieved
>>>>>>>> and
>>>>>>>>>> merged accordingly, in this case [19, 19], and create a new
>> session
>>>>>>>> [15,
>>>>>>>>> 19]
>>>>>>>>>> I'm little confused when you said "the window [15, 15] SHOULD
>>> actually
>>>>>>>> be
>>>>>>>>>> expired at 18 and hence the next record (19, 5) should be for a
>> new
>>>>>>>>> session
>>>>>>>>>> already.". If i understand it correctly, the expiration of the
>>> window
>>>>>>>> is
>>>>>>>>>> only checked when next event (19,5) comes and then it should be
>>> merged
>>>>>>>> to
>>>>>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
>>>>>>>>>> I cannot think of a case where a window will be split by a late
>>> event,
>>>>>>>>>> because if event A and C fall into the same session window, a
>> late
>>>>>>>> event
>>>>>>>>> B
>>>>>>>>>> in middle will definitely fall into C's gap as well. IOW, late
>>> event
>>>>>>>> will
>>>>>>>>>> only cause window extension, not split.
>>>>>>>>>>
>>>>>>>>>> Take a look at another example,
>>>>>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>>>>>
>>>>>>>>>> in this case when (15, 3) is received, [13,13] should be
>> retrieved
>>> and
>>>>>>>>>> merged to a new window [13, 15], then [19,19] should be updated
>> to
>>>>>> [13,
>>>>>>>>>> 19]. Correct?
>>>>>>>>>>
>>>>>>>>>> To be able to achieve that, like you said, the gap needs to be
>>> stored
>>>>>>>> for
>>>>>>>>>> sessions. We don't need to save the gap with each event, but only
>>> for
>>>>>>>>> each
>>>>>>>>>> session window. To avoid upgrading existing session window, how
>>> about
>>>>>>>>>> create a new Window type extended from SessionWindow along with a
>>> new
>>>>>>>>>> KeySchema?
>>>>>>>>>>
>>>>>>>>>> What do you think?
>>>>>>>>>>
>>>>>>>>>> Lei
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <
>> wangguoz@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Lei,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
>>>>>> there
>>>>>>>>> is a
>>>>>>>>>>> question I have:
>>>>>>>>>>>
>>>>>>>>>>> The session windows are defined per key, i.e. does that mean
>> that
>>>>>> each
>>>>>>>>>>> incoming record of the key can dynamically change the gap of the
>>>>>>>> window?
>>>>>>>>>>> For example, say you have the following record for the same key
>>>>>> coming
>>>>>>>>> in
>>>>>>>>>>> order, where the first time is the timestamp of the record, and
>>> the
>>>>>>>>> second
>>>>>>>>>>> value is the extracted gap value:
>>>>>>>>>>>
>>>>>>>>>>> (10, 10), (19, 5), ...
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> When we receive the first record at time 10, the gap is
>> extracted
>>> as
>>>>>>>> 10,
>>>>>>>>>>> and hence the window will be expired at 20 if no other record is
>>>>>>>>> received.
>>>>>>>>>>> When we receive the second record at time 19, the gap is
>> modified
>>> to
>>>>>>>> 5,
>>>>>>>>> and
>>>>>>>>>>> hence the window will be expired at 24 if no other record is
>>>>>> received.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> If that's the case, I'm wondering how out-of-order data can be
>>>>>> handled
>>>>>>>>>>> then, consider this stream:
>>>>>>>>>>>
>>>>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>>>>>
>>>>>>>>>>> I.e. you received a late record indicating at timestamp 15,
>> which
>>>>>>>>> shorten
>>>>>>>>>>> the gap to 3. It means that the window SHOULD actually be
>> expired
>>> at
>>>>>>>> 18,
>>>>>>>>>>> and hence the next record (19, 5) should be for a new session
>>>>>> already.
>>>>>>>>>>> Today Streams session window implementation does not do "window
>>>>>>>> split",
>>>>>>>>> so
>>>>>>>>>>> have you thought about how this can be extended?
>>>>>>>>>>>
>>>>>>>>>>> Also since in your proposal each session window's gap value
>> would
>>> be
>>>>>>>>>>> different, we need to store this value along with each record
>>> then,
>>>>>>>> how
>>>>>>>>>>> would we store it, and what would be the upgrade path if it is
>>> not a
>>>>>>>>>>> compatible change on disk storage etc?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com>
>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I created a KIP to add dynamic gap session window support to
>>> Kafka
>>>>>>>>>>> Streams
>>>>>>>>>>>> DSL.
>>>>>>>>>>>>
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>> 362%3A+Support+dynamic+gap+session+window
>>>>>>>>>>>>
>>>>>>>>>>>> Please take a look,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Lei
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 
> 


Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Lei,

Just checking what's the current status of this KIP. We have a KIP deadline
for 2.2 on 24th and wondering if this one may be able to make it.


Guozhang

On Sat, Dec 15, 2018 at 1:01 PM Lei Chen <le...@gmail.com> wrote:

> Sorry for the late reply Matthias. Have been busy with other work recently.
> I'll restart the discussion and update the KIP accordingly.
>
> Lei
>
> On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Any update on this KIP?
> >
> > On 9/20/18 3:30 PM, Matthias J. Sax wrote:
> > > Thanks for following up. Very nice examples!
> > >
> > > I think, that the window definition for Flink is semantically
> > > questionable. If there is only a single record, why is the window
> > > defined as [ts, ts+gap]? To me, this definition is not sound and seems
> > > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you
> mention
> > > would be semantically more useful -- still, I think that defining the
> > > window as [ts,ts] as we do currently in Kafka Streams is semantically
> > > the best.
> > >
> > > I have the impression, that Flink only defines them differently,
> because
> > > it solves the issues in the implementation. (Ie, an implementation
> > > details leaks into the semantics, what is usually not desired.)
> > >
> > > However, I believe that we could change the implementation accordingly.
> > > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap])
> in
> > > RocksDB, but at API level we return [ts,ts]. This way, we can still
> find
> > > all windows we need and provide the same deterministic behavior and
> keep
> > > the current window boundaries on the semantic level (there is no need
> to
> > > store the window start and/or end time). With this technique, we can
> > > also implement dynamic session gaps. I think, we would need to store
> the
> > > used "gap" for each window, too. But again, this would be an
> > > implementation detail.
> > >
> > > Let's see what others think.
> > >
> > > One tricky question we would need to address is, how we can be backward
> > > compatible. I am currently working on KIP-258 that should help to
> > > address this backward compatibility issue though.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 9/19/18 5:17 PM, Lei Chen wrote:
> > >> Thanks Matthias. That makes sense.
> > >>
> > >> You're right that symmetric merge is necessary to ensure consistency.
> On
> > >> the other hand, I kinda feel it defeats the purpose of dynamic gap,
> > which
> > >> is to update the gap from old value to new value. The symmetric merge
> > >> always honor the larger gap in both direction, rather than honor the
> gap
> > >> carried by record with larger timestamp. I wasn't able to find any
> > semantic
> > >> definitions w.r.t this particular aspect online, but spent some time
> > >> looking into other streaming engines like Apache Flink.
> > >>
> > >> Apache Flink defines the window differently, that uses (start time,
> > start
> > >> time + gap).
> > >>
> > >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will
> be:
> > >> [10,20]
> > >> [19,24] => merged to [10,24]
> > >> [15,18] => merged to [10,24]
> > >>
> > >> while example (15,3)(19,5)(10,10) will be
> > >> [15,18]
> > >> [19,24] => no merge
> > >> [10,20] => merged to [10,24]
> > >>
> > >> however, since it only records gap in future direction, not past, a
> late
> > >> record might not trigger any merge where in symmetric merge it would.
> > >> (7,2),(10, 10), (19,5),(15,3)
> > >> [7,9]
> > >> [10,20]
> > >> [19,24] => merged to [10,24]
> > >> [15,18] => merged to [10,24]
> > >> so at the end
> > >> two windows [7,9][10,24] are there.
> > >>
> > >> As you can see, in Flink, the gap semantic is more toward to the way
> > that,
> > >> a gap carried by one record only affects how this record merges with
> > future
> > >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1)
> > is
> > >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's
> call
> > >> this "forward-merge" way of handling this. I just went thought some
> > source
> > >> code and if my understanding is incorrect about Flink's
> implementation,
> > >> please correct me.
> > >>
> > >> On the other hand, if we want to do symmetric merge in Kafka Streams,
> we
> > >> can change the window definition to [start time - gap, start time +
> > gap].
> > >> This way the example (7,2),(10, 10), (19,5),(15,3) will be
> > >> [5,9]
> > >> [0,20] => merged to [0,20]
> > >> [14,24] => merged to [0,24]
> > >> [12,18] => merged to [0,24]
> > >>
> > >>  (19,5),(15,3)(7,2),(10, 10) will generate same result
> > >> [14,24]
> > >> [12,18] => merged to [12,24]
> > >> [5,9] => no merge
> > >> [0,20] => merged to [0,24]
> > >>
> > >> Note that symmetric-merge would require us to change the way how Kafka
> > >> Steams fetch windows now, instead of fetching range from timestamp-gap
> > to
> > >> timestamp+gap, we will need to fetch all windows that are not expired
> > yet.
> > >> On the other hand, I'm not sure how this will impact the current logic
> > of
> > >> how a window is considered as closed, because the window doesn't carry
> > end
> > >> timestamp anymore, but end timestamp + gap.
> > >>
> > >> So do you guys think forward-merge approach used by Flink makes more
> > sense
> > >> in Kafka Streams, or symmetric-merge makes more sense? Both of them
> > seems
> > >> to me can give deterministic result.
> > >>
> > >> BTW I'll add the use case into original KIP.
> > >>
> > >> Lei
> > >>
> > >>
> > >> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <
> matthias@confluent.io>
> > >> wrote:
> > >>
> > >>> Thanks for explaining your understanding. And thanks for providing
> more
> > >>> details about the use-case. Maybe you can add this to the KIP?
> > >>>
> > >>>
> > >>> First one general comment. I guess that my and Guozhangs
> understanding
> > >>> about gap/close/gracePeriod is the same as yours -- we might not have
> > >>> use the term precisely correct in previous email.
> > >>>
> > >>>
> > >>> To you semantics of gap in detail:
> > >>>
> > >>>> I thought when (15,3) is received, kafka streams look up for
> neighbor
> > >>>> record/window that is within the gap
> > >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
> > its
> > >>> own
> > >>>> window [10, 10], which is
> > >>>> out of the gap, so nothing will be found and no merge occurs. Hence
> we
> > >>> have
> > >>>> two windows now in session store,
> > >>>> [10, 10] and [15, 15] respectively.
> > >>>
> > >>> If you have record (10,10), we currently create a window of size
> > >>> [10,10]. When record (15,3) arrives, your observation that the gap 3
> is
> > >>> too small to be merged into [10,10] window -- however, merging is a
> > >>> symmetric operation and the existing window of [10,10] has a gap of
> 10
> > >>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
> > >>> merged into the existing window resulting in window [10,15].
> > >>>
> > >>> If we don't respect the gap both ways, we end up with inconsistencies
> > if
> > >>> data is out-of-order. For example, if we use the same input record
> > >>> (10,10) and (15,3) from above, and it happens that (15,3) is
> processed
> > >>> first, when processing out-of-order record (10,10) we would want to
> > >>> merge both into a single window, too?
> > >>>
> > >>> Does this make sense?
> > >>>
> > >>> Now the question remains, if two records with different gap parameter
> > >>> are merged, which gap should we apply for processing/merging future
> > >>> records into the window? It seems, that we should use the gap
> parameter
> > >>> from the record with this larges timestamp. In the example above
> > (15,3).
> > >>> We would use gap 3 after merging independent of the order of
> > processing.
> > >>>
> > >>>
> > >>>> Also another thing worth mentioning is that, the session window
> object
> > >>>> created in current kafka streams
> > >>>> implementation doesn't have gap info, it has start and end, which is
> > the
> > >>>> earliest and latest event timestamp
> > >>>> in that window interval, i.e for (10,10), the session window gets
> > created
> > >>>> is [10,10], rather than [10,20]. Just to clarify
> > >>>> so that it's clear why (10,10) cannot be fetched when looking for
> gap
> > of
> > >>>> (15,3), it's because the end boundary 10 of
> > >>>> [10,10] is smaller than search boundary [12,18].
> > >>>
> > >>> We don't need to store the gap, because the gap is know from the
> window
> > >>> definition. The created window size depends on the data that is
> > >>> contained in the window. I guess one could define it differently,
> too,
> > >>> ie, for the (10,10) record, we create a window [0,20] -- not sure if
> it
> > >>> makes a big difference in practice though. Note, that creating window
> > >>> [10,20] would not be correct, because the gap must be applied in both
> > >>> directions, not just into the future.
> > >>>
> > >>> About the second part: the search would not be applied from (15,3) in
> > >>> range [12,18], but from existing window [10,10] into range [0,20] and
> > 15
> > >>> is contained there. This example also shows, that we would need to
> come
> > >>> up with a clever way, to identify window [10,10] when processing
> (15,3)
> > >>> -- not sure atm how to do this. However, only consider (15,3) would
> > >>> result in inconsistencies for out-of-order data as pointed out above
> > and
> > >>> would not be sufficient.
> > >>>
> > >>>
> > >>> Does this make sense?
> > >>>
> > >>>
> > >>> Or is there another way to define dynamic session gap semantics in a
> > >>> deterministic way with regard to out-of-order data?
> > >>>
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 9/11/18 4:28 PM, Lei Chen wrote:
> > >>>> Thanks Matthias and Guozhang for the response.
> > >>>>
> > >>>> Seems like our understanding mainly differs in the semantics of gap
> in
> > >>>> session windows.
> > >>>>
> > >>>> My understanding is that gap is used to merge nearby records
> together
> > >>> such
> > >>>> that no record
> > >>>> in the merged window has distance later than gap. In Kafka Streams's
> > >>>> implementation it's
> > >>>> mainly used to find neighbor records/windows in session store so
> that
> > >>>> nearby records can
> > >>>> be merge. It is NOT used to determine when a window should be
> closed,
> > >>> which
> > >>>> is in
> > >>>> fact determined by window's grace period.
> > >>>>
> > >>>> Guozhang you said "b. When later we received (15, 3), it means that
> > this
> > >>>> record ** changed **
> > >>>> the window gap interval from 10 to 3, and hence we received a new
> > record
> > >>> at
> > >>>> 15, with the new window gap of 3, it means that by timestamp 18 (15
> +
> > 3)
> > >>> if
> > >>>> we have not received any new data, the window should be closed, i.e.
> > the
> > >>>> window is now [10, 18) which includes two records at 10 and 15."
> > >>>>
> > >>>> This is different from what i thought will happen.
> > >>>>
> > >>>> I thought when (15,3) is received, kafka streams look up for
> neighbor
> > >>>> record/window that is within the gap
> > >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
> > its
> > >>> own
> > >>>> window [10, 10], which is
> > >>>> out of the gap, so nothing will be found and no merge occurs. Hence
> we
> > >>> have
> > >>>> two windows now in session store,
> > >>>> [10, 10] and [15, 15] respectively.
> > >>>>
> > >>>> Also another thing worth mentioning is that, the session window
> object
> > >>>> created in current kafka streams
> > >>>> implementation doesn't have gap info, it has start and end, which is
> > the
> > >>>> earliest and latest event timestamp
> > >>>> in that window interval, i.e for (10,10), the session window gets
> > created
> > >>>> is [10,10], rather than [10,20]. Just to clarify
> > >>>> so that it's clear why (10,10) cannot be fetched when looking for
> gap
> > of
> > >>>> (15,3), it's because the end boundary 10 of
> > >>>> [10,10] is smaller than search boundary [12,18].
> > >>>>
> > >>>> Please correct me if my understanding is wrong here.
> > >>>>
> > >>>> @Matthias, to answer your use case question, we have an use case
> where
> > >>>> asynchronous time series data
> > >>>> are received in the stream, from different contributors, with
> > different
> > >>>> quality and at different pace.
> > >>>> Inside Kafka Streams, we use state to maintain statistic
> aggregations
> > and
> > >>>> other mathematics model to track
> > >>>> the liquidity and calculate time decay rate and dynamic gap, so that
> > at
> > >>>> runtime, for each contributor we can
> > >>>> 1. determine how many historical records we should maintain in
> state.
> > >>>> 2. for each incoming record, output a record using aggregations from
> > >>>> *nearby* records from that contributor.
> > >>>> Why fixed gap session window doesn't work here? Because the
> > definition of
> > >>>> "nearby" here is determined by
> > >>>> several very dynamic factors in our case, it changes not only with
> > >>>> different hours in a day, but also related to
> > >>>> other contributors.
> > >>>> The purpose of this KIP is to suggest a dynamic session window
> > >>>> implementation so that we can embed such
> > >>>> dynamic "nearby" calculation capability into kafka streams session
> > >>> windows
> > >>>> semantics. Hope it makes sense to you.
> > >>>>
> > >>>> Lei
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wa...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>>> Hello Lei,
> > >>>>>
> > >>>>> As Matthias mentioned, the key question here is that because of the
> > late
> > >>>>> arrivals of records which may indicate a shorter session gap
> > interval,
> > >>> some
> > >>>>> session windows may be "mistakenly" merged and hence need to be
> > undone
> > >>> the
> > >>>>> merge, i.e. to split them again.
> > >>>>>
> > >>>>> Back to my example, you are right that the processing result of
> > >>>>>
> > >>>>> (10, 10), (19, 5), (15, 3) ..
> > >>>>>
> > >>>>> should be the same as the processing result of
> > >>>>>
> > >>>>> (10, 10), (15, 3), (19, 5) ..
> > >>>>>
> > >>>>> Note that the second value is NOT the window end time, but the
> > extracted
> > >>>>> window gap interval, as you suggested in the KIP this value can be
> > >>>>> dynamically changed
> > >>>>>
> > >>>>> a. If you take a look at the second ordering, when we receive (10,
> > 10)
> > >>> it
> > >>>>> means a window starting at 10 is created, and its gap interval is
> 10,
> > >>> which
> > >>>>> means that if by the timestamp of 20 we do not receive any new
> data,
> > >>> then
> > >>>>> the window should be closed, i.e. the window [10, 20).
> > >>>>>
> > >>>>> b. When later we received (15, 3), it means that this record **
> > changed
> > >>> **
> > >>>>> the window gap interval from 10 to 3, and hence we received a new
> > >>> record at
> > >>>>> 15, with the new window gap of 3, it means that by timestamp 18
> (15 +
> > >>> 3) if
> > >>>>> we have not received any new data, the window should be closed,
> i.e.
> > the
> > >>>>> window is now [10, 18) which includes two records at 10 and 15.
> > >>>>>
> > >>>>> c. The third record is received at 19, which is after the window
> > close
> > >>> time
> > >>>>> 18, it means that we should now start a new window starting at 19,
> > i.e.
> > >>> the
> > >>>>> window is [19, 24),
> > >>>>>
> > >>>>>
> > >>>>> BUT, because of the out of ordering, we did not receive (15, 3) in
> > time,
> > >>>>> but received (19, 5), it will cause us to mistakenly merge the
> > window of
> > >>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
> > >>> (15, 3)
> > >>>>> we realized that the previous window should have been ended at 18.
> > >>>>>
> > >>>>> Does that make sense to you?
> > >>>>>
> > >>>>>
> > >>>>> Guozhang
> > >>>>>
> > >>>>>
> > >>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <
> > matthias@confluent.io>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> I cannot follow the example:
> > >>>>>>
> > >>>>>>>> (10, 10), (15, 3), (19, 5) ...
> > >>>>>>
> > >>>>>> First, [10,10] is created, second the window is extended to
> [10,15],
> > >>> and
> > >>>>>> third [19,19] is created. Why would there be a [15,15]? And why
> > would
> > >>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3)
> and
> > >>>>>> thus [19,19] should be its own window?
> > >>>>>>
> > >>>>>>> Take a look at another example,
> > >>>>>>> (13, 3),  (19, 5), (15, 3) ...
> > >>>>>>>
> > >>>>>>> in this case when (15, 3) is received, [13,13] should be
> retrieved
> > and
> > >>>>>>> merged to a new window [13, 15], then [19,19] should be updated
> to
> > >>> [13,
> > >>>>>>> 19]. Correct?
> > >>>>>>
> > >>>>>> This example makes sense. However, Guozhang's example was
> different.
> > >>> The
> > >>>>>> late even, _reduces_ the gap and this can lead to a window split.
> > >>>>>> Guozhang's example was
> > >>>>>>
> > >>>>>>>>> (10, 10), (19, 5), (15, 3) ...
> > >>>>>>
> > >>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so
> 10
> > >>> and
> > >>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15]
> > and
> > >>>>>> [19,19] must be two windows, ie, original window [10,19] must be
> > split.
> > >>>>>>
> > >>>>>>
> > >>>>>> Or maybe you have different semantic about gaps are dynamically
> > >>> modified
> > >>>>>> in mind? It's a little unclear for the KIP itself what semantics
> > >>> dynamic
> > >>>>>> sessions windows should have.
> > >>>>>>
> > >>>>>>
> > >>>>>> What is also unclear to me atm is, what use cases you have in
> mind?
> > The
> > >>>>>> KIP only says
> > >>>>>>
> > >>>>>>> the statistical aggregation result, liquidity of the records,
> > >>>>>>
> > >>>>>>
> > >>>>>> I am not sure what this means. Can you elaborate?
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
> > >>>>>>> Hi Guozhang,
> > >>>>>>>
> > >>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
> > >>>>> events
> > >>>>>>> and glad that you brought it up.
> > >>>>>>>
> > >>>>>>> In the example you gave,
> > >>>>>>>
> > >>>>>>> (10, 10), (19, 5), (15, 3) ...
> > >>>>>>>
> > >>>>>>> my understanding is that the correct result window should be the
> > same
> > >>>>> as
> > >>>>>> in
> > >>>>>>> order events
> > >>>>>>>
> > >>>>>>> (10, 10), (15, 3), (19, 5) ...
> > >>>>>>>
> > >>>>>>> when (15, 3) is received, [15,15] is creatd
> > >>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and
> [15,
> > >>> 19]
> > >>>>>> is
> > >>>>>>> created, meanwhile [15,15] is removed
> > >>>>>>>
> > >>>>>>> back to out of order case,
> > >>>>>>>
> > >>>>>>> when (19 ,5) is received, [19, 19] is created
> > >>>>>>> when (15, 3) is received, in order to generate the same result,
> > >>>>>>> 1. if late event is later than retention period, it will be
> dropped
> > >>>>>>> 2. otherwise, adjacent session windows within gap should be
> > retrieved
> > >>>>> and
> > >>>>>>> merged accordingly, in this case [19, 19], and create a new
> session
> > >>>>> [15,
> > >>>>>> 19]
> > >>>>>>> I'm little confused when you said "the window [15, 15] SHOULD
> > actually
> > >>>>> be
> > >>>>>>> expired at 18 and hence the next record (19, 5) should be for a
> new
> > >>>>>> session
> > >>>>>>> already.". If i understand it correctly, the expiration of the
> > window
> > >>>>> is
> > >>>>>>> only checked when next event (19,5) comes and then it should be
> > merged
> > >>>>> to
> > >>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
> > >>>>>>> I cannot think of a case where a window will be split by a late
> > event,
> > >>>>>>> because if event A and C fall into the same session window, a
> late
> > >>>>> event
> > >>>>>> B
> > >>>>>>> in middle will definitely fall into C's gap as well. IOW, late
> > event
> > >>>>> will
> > >>>>>>> only cause window extension, not split.
> > >>>>>>>
> > >>>>>>> Take a look at another example,
> > >>>>>>> (13, 3),  (19, 5), (15, 3) ...
> > >>>>>>>
> > >>>>>>> in this case when (15, 3) is received, [13,13] should be
> retrieved
> > and
> > >>>>>>> merged to a new window [13, 15], then [19,19] should be updated
> to
> > >>> [13,
> > >>>>>>> 19]. Correct?
> > >>>>>>>
> > >>>>>>> To be able to achieve that, like you said, the gap needs to be
> > stored
> > >>>>> for
> > >>>>>>> sessions. We don't need to save the gap with each event, but only
> > for
> > >>>>>> each
> > >>>>>>> session window. To avoid upgrading existing session window, how
> > about
> > >>>>>>> create a new Window type extended from SessionWindow along with a
> > new
> > >>>>>>> KeySchema?
> > >>>>>>>
> > >>>>>>> What do you think?
> > >>>>>>>
> > >>>>>>> Lei
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <
> wangguoz@gmail.com>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hello Lei,
> > >>>>>>>>
> > >>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
> > >>> there
> > >>>>>> is a
> > >>>>>>>> question I have:
> > >>>>>>>>
> > >>>>>>>> The session windows are defined per key, i.e. does that mean
> that
> > >>> each
> > >>>>>>>> incoming record of the key can dynamically change the gap of the
> > >>>>> window?
> > >>>>>>>> For example, say you have the following record for the same key
> > >>> coming
> > >>>>>> in
> > >>>>>>>> order, where the first time is the timestamp of the record, and
> > the
> > >>>>>> second
> > >>>>>>>> value is the extracted gap value:
> > >>>>>>>>
> > >>>>>>>> (10, 10), (19, 5), ...
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> When we receive the first record at time 10, the gap is
> extracted
> > as
> > >>>>> 10,
> > >>>>>>>> and hence the window will be expired at 20 if no other record is
> > >>>>>> received.
> > >>>>>>>> When we receive the second record at time 19, the gap is
> modified
> > to
> > >>>>> 5,
> > >>>>>> and
> > >>>>>>>> hence the window will be expired at 24 if no other record is
> > >>> received.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> If that's the case, I'm wondering how out-of-order data can be
> > >>> handled
> > >>>>>>>> then, consider this stream:
> > >>>>>>>>
> > >>>>>>>> (10, 10), (19, 5), (15, 3) ...
> > >>>>>>>>
> > >>>>>>>> I.e. you received a late record indicating at timestamp 15,
> which
> > >>>>>> shorten
> > >>>>>>>> the gap to 3. It means that the window SHOULD actually be
> expired
> > at
> > >>>>> 18,
> > >>>>>>>> and hence the next record (19, 5) should be for a new session
> > >>> already.
> > >>>>>>>> Today Streams session window implementation does not do "window
> > >>>>> split",
> > >>>>>> so
> > >>>>>>>> have you thought about how this can be extended?
> > >>>>>>>>
> > >>>>>>>> Also since in your proposal each session window's gap value
> would
> > be
> > >>>>>>>> different, we need to store this value along with each record
> > then,
> > >>>>> how
> > >>>>>>>> would we store it, and what would be the upgrade path if it is
> > not a
> > >>>>>>>> compatible change on disk storage etc?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com>
> > wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi All,
> > >>>>>>>>>
> > >>>>>>>>> I created a KIP to add dynamic gap session window support to
> > Kafka
> > >>>>>>>> Streams
> > >>>>>>>>> DSL.
> > >>>>>>>>>
> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>> 362%3A+Support+dynamic+gap+session+window
> > >>>>>>>>>
> > >>>>>>>>> Please take a look,
> > >>>>>>>>>
> > >>>>>>>>> Thanks,
> > >>>>>>>>> Lei
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>> -- Guozhang
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> >
> >
>


-- 
-- Guozhang

Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by Lei Chen <le...@gmail.com>.
Sorry for the late reply Matthias. Have been busy with other work recently.
I'll restart the discussion and update the KIP accordingly.

Lei

On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Any update on this KIP?
>
> On 9/20/18 3:30 PM, Matthias J. Sax wrote:
> > Thanks for following up. Very nice examples!
> >
> > I think, that the window definition for Flink is semantically
> > questionable. If there is only a single record, why is the window
> > defined as [ts, ts+gap]? To me, this definition is not sound and seems
> > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you mention
> > would be semantically more useful -- still, I think that defining the
> > window as [ts,ts] as we do currently in Kafka Streams is semantically
> > the best.
> >
> > I have the impression, that Flink only defines them differently, because
> > it solves the issues in the implementation. (Ie, an implementation
> > details leaks into the semantics, what is usually not desired.)
> >
> > However, I believe that we could change the implementation accordingly.
> > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) in
> > RocksDB, but at API level we return [ts,ts]. This way, we can still find
> > all windows we need and provide the same deterministic behavior and keep
> > the current window boundaries on the semantic level (there is no need to
> > store the window start and/or end time). With this technique, we can
> > also implement dynamic session gaps. I think, we would need to store the
> > used "gap" for each window, too. But again, this would be an
> > implementation detail.
> >
> > Let's see what others think.
> >
> > One tricky question we would need to address is, how we can be backward
> > compatible. I am currently working on KIP-258 that should help to
> > address this backward compatibility issue though.
> >
> >
> > -Matthias
> >
> >
> >
> > On 9/19/18 5:17 PM, Lei Chen wrote:
> >> Thanks Matthias. That makes sense.
> >>
> >> You're right that symmetric merge is necessary to ensure consistency. On
> >> the other hand, I kinda feel it defeats the purpose of dynamic gap,
> which
> >> is to update the gap from old value to new value. The symmetric merge
> >> always honor the larger gap in both direction, rather than honor the gap
> >> carried by record with larger timestamp. I wasn't able to find any
> semantic
> >> definitions w.r.t this particular aspect online, but spent some time
> >> looking into other streaming engines like Apache Flink.
> >>
> >> Apache Flink defines the window differently, that uses (start time,
> start
> >> time + gap).
> >>
> >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will be:
> >> [10,20]
> >> [19,24] => merged to [10,24]
> >> [15,18] => merged to [10,24]
> >>
> >> while example (15,3)(19,5)(10,10) will be
> >> [15,18]
> >> [19,24] => no merge
> >> [10,20] => merged to [10,24]
> >>
> >> however, since it only records gap in future direction, not past, a late
> >> record might not trigger any merge where in symmetric merge it would.
> >> (7,2),(10, 10), (19,5),(15,3)
> >> [7,9]
> >> [10,20]
> >> [19,24] => merged to [10,24]
> >> [15,18] => merged to [10,24]
> >> so at the end
> >> two windows [7,9][10,24] are there.
> >>
> >> As you can see, in Flink, the gap semantic is more toward to the way
> that,
> >> a gap carried by one record only affects how this record merges with
> future
> >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1)
> is
> >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call
> >> this "forward-merge" way of handling this. I just went thought some
> source
> >> code and if my understanding is incorrect about Flink's implementation,
> >> please correct me.
> >>
> >> On the other hand, if we want to do symmetric merge in Kafka Streams, we
> >> can change the window definition to [start time - gap, start time +
> gap].
> >> This way the example (7,2),(10, 10), (19,5),(15,3) will be
> >> [5,9]
> >> [0,20] => merged to [0,20]
> >> [14,24] => merged to [0,24]
> >> [12,18] => merged to [0,24]
> >>
> >>  (19,5),(15,3)(7,2),(10, 10) will generate same result
> >> [14,24]
> >> [12,18] => merged to [12,24]
> >> [5,9] => no merge
> >> [0,20] => merged to [0,24]
> >>
> >> Note that symmetric-merge would require us to change the way how Kafka
> >> Steams fetch windows now, instead of fetching range from timestamp-gap
> to
> >> timestamp+gap, we will need to fetch all windows that are not expired
> yet.
> >> On the other hand, I'm not sure how this will impact the current logic
> of
> >> how a window is considered as closed, because the window doesn't carry
> end
> >> timestamp anymore, but end timestamp + gap.
> >>
> >> So do you guys think forward-merge approach used by Flink makes more
> sense
> >> in Kafka Streams, or symmetric-merge makes more sense? Both of them
> seems
> >> to me can give deterministic result.
> >>
> >> BTW I'll add the use case into original KIP.
> >>
> >> Lei
> >>
> >>
> >> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <ma...@confluent.io>
> >> wrote:
> >>
> >>> Thanks for explaining your understanding. And thanks for providing more
> >>> details about the use-case. Maybe you can add this to the KIP?
> >>>
> >>>
> >>> First one general comment. I guess that my and Guozhangs understanding
> >>> about gap/close/gracePeriod is the same as yours -- we might not have
> >>> use the term precisely correct in previous email.
> >>>
> >>>
> >>> To you semantics of gap in detail:
> >>>
> >>>> I thought when (15,3) is received, kafka streams look up for neighbor
> >>>> record/window that is within the gap
> >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
> its
> >>> own
> >>>> window [10, 10], which is
> >>>> out of the gap, so nothing will be found and no merge occurs. Hence we
> >>> have
> >>>> two windows now in session store,
> >>>> [10, 10] and [15, 15] respectively.
> >>>
> >>> If you have record (10,10), we currently create a window of size
> >>> [10,10]. When record (15,3) arrives, your observation that the gap 3 is
> >>> too small to be merged into [10,10] window -- however, merging is a
> >>> symmetric operation and the existing window of [10,10] has a gap of 10
> >>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
> >>> merged into the existing window resulting in window [10,15].
> >>>
> >>> If we don't respect the gap both ways, we end up with inconsistencies
> if
> >>> data is out-of-order. For example, if we use the same input record
> >>> (10,10) and (15,3) from above, and it happens that (15,3) is processed
> >>> first, when processing out-of-order record (10,10) we would want to
> >>> merge both into a single window, too?
> >>>
> >>> Does this make sense?
> >>>
> >>> Now the question remains, if two records with different gap parameter
> >>> are merged, which gap should we apply for processing/merging future
> >>> records into the window? It seems, that we should use the gap parameter
> >>> from the record with this larges timestamp. In the example above
> (15,3).
> >>> We would use gap 3 after merging independent of the order of
> processing.
> >>>
> >>>
> >>>> Also another thing worth mentioning is that, the session window object
> >>>> created in current kafka streams
> >>>> implementation doesn't have gap info, it has start and end, which is
> the
> >>>> earliest and latest event timestamp
> >>>> in that window interval, i.e for (10,10), the session window gets
> created
> >>>> is [10,10], rather than [10,20]. Just to clarify
> >>>> so that it's clear why (10,10) cannot be fetched when looking for gap
> of
> >>>> (15,3), it's because the end boundary 10 of
> >>>> [10,10] is smaller than search boundary [12,18].
> >>>
> >>> We don't need to store the gap, because the gap is know from the window
> >>> definition. The created window size depends on the data that is
> >>> contained in the window. I guess one could define it differently, too,
> >>> ie, for the (10,10) record, we create a window [0,20] -- not sure if it
> >>> makes a big difference in practice though. Note, that creating window
> >>> [10,20] would not be correct, because the gap must be applied in both
> >>> directions, not just into the future.
> >>>
> >>> About the second part: the search would not be applied from (15,3) in
> >>> range [12,18], but from existing window [10,10] into range [0,20] and
> 15
> >>> is contained there. This example also shows, that we would need to come
> >>> up with a clever way, to identify window [10,10] when processing (15,3)
> >>> -- not sure atm how to do this. However, only consider (15,3) would
> >>> result in inconsistencies for out-of-order data as pointed out above
> and
> >>> would not be sufficient.
> >>>
> >>>
> >>> Does this make sense?
> >>>
> >>>
> >>> Or is there another way to define dynamic session gap semantics in a
> >>> deterministic way with regard to out-of-order data?
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 9/11/18 4:28 PM, Lei Chen wrote:
> >>>> Thanks Matthias and Guozhang for the response.
> >>>>
> >>>> Seems like our understanding mainly differs in the semantics of gap in
> >>>> session windows.
> >>>>
> >>>> My understanding is that gap is used to merge nearby records together
> >>> such
> >>>> that no record
> >>>> in the merged window has distance later than gap. In Kafka Streams's
> >>>> implementation it's
> >>>> mainly used to find neighbor records/windows in session store so that
> >>>> nearby records can
> >>>> be merge. It is NOT used to determine when a window should be closed,
> >>> which
> >>>> is in
> >>>> fact determined by window's grace period.
> >>>>
> >>>> Guozhang you said "b. When later we received (15, 3), it means that
> this
> >>>> record ** changed **
> >>>> the window gap interval from 10 to 3, and hence we received a new
> record
> >>> at
> >>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 +
> 3)
> >>> if
> >>>> we have not received any new data, the window should be closed, i.e.
> the
> >>>> window is now [10, 18) which includes two records at 10 and 15."
> >>>>
> >>>> This is different from what i thought will happen.
> >>>>
> >>>> I thought when (15,3) is received, kafka streams look up for neighbor
> >>>> record/window that is within the gap
> >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
> its
> >>> own
> >>>> window [10, 10], which is
> >>>> out of the gap, so nothing will be found and no merge occurs. Hence we
> >>> have
> >>>> two windows now in session store,
> >>>> [10, 10] and [15, 15] respectively.
> >>>>
> >>>> Also another thing worth mentioning is that, the session window object
> >>>> created in current kafka streams
> >>>> implementation doesn't have gap info, it has start and end, which is
> the
> >>>> earliest and latest event timestamp
> >>>> in that window interval, i.e for (10,10), the session window gets
> created
> >>>> is [10,10], rather than [10,20]. Just to clarify
> >>>> so that it's clear why (10,10) cannot be fetched when looking for gap
> of
> >>>> (15,3), it's because the end boundary 10 of
> >>>> [10,10] is smaller than search boundary [12,18].
> >>>>
> >>>> Please correct me if my understanding is wrong here.
> >>>>
> >>>> @Matthias, to answer your use case question, we have an use case where
> >>>> asynchronous time series data
> >>>> are received in the stream, from different contributors, with
> different
> >>>> quality and at different pace.
> >>>> Inside Kafka Streams, we use state to maintain statistic aggregations
> and
> >>>> other mathematics model to track
> >>>> the liquidity and calculate time decay rate and dynamic gap, so that
> at
> >>>> runtime, for each contributor we can
> >>>> 1. determine how many historical records we should maintain in state.
> >>>> 2. for each incoming record, output a record using aggregations from
> >>>> *nearby* records from that contributor.
> >>>> Why fixed gap session window doesn't work here? Because the
> definition of
> >>>> "nearby" here is determined by
> >>>> several very dynamic factors in our case, it changes not only with
> >>>> different hours in a day, but also related to
> >>>> other contributors.
> >>>> The purpose of this KIP is to suggest a dynamic session window
> >>>> implementation so that we can embed such
> >>>> dynamic "nearby" calculation capability into kafka streams session
> >>> windows
> >>>> semantics. Hope it makes sense to you.
> >>>>
> >>>> Lei
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hello Lei,
> >>>>>
> >>>>> As Matthias mentioned, the key question here is that because of the
> late
> >>>>> arrivals of records which may indicate a shorter session gap
> interval,
> >>> some
> >>>>> session windows may be "mistakenly" merged and hence need to be
> undone
> >>> the
> >>>>> merge, i.e. to split them again.
> >>>>>
> >>>>> Back to my example, you are right that the processing result of
> >>>>>
> >>>>> (10, 10), (19, 5), (15, 3) ..
> >>>>>
> >>>>> should be the same as the processing result of
> >>>>>
> >>>>> (10, 10), (15, 3), (19, 5) ..
> >>>>>
> >>>>> Note that the second value is NOT the window end time, but the
> extracted
> >>>>> window gap interval, as you suggested in the KIP this value can be
> >>>>> dynamically changed
> >>>>>
> >>>>> a. If you take a look at the second ordering, when we receive (10,
> 10)
> >>> it
> >>>>> means a window starting at 10 is created, and its gap interval is 10,
> >>> which
> >>>>> means that if by the timestamp of 20 we do not receive any new data,
> >>> then
> >>>>> the window should be closed, i.e. the window [10, 20).
> >>>>>
> >>>>> b. When later we received (15, 3), it means that this record **
> changed
> >>> **
> >>>>> the window gap interval from 10 to 3, and hence we received a new
> >>> record at
> >>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 +
> >>> 3) if
> >>>>> we have not received any new data, the window should be closed, i.e.
> the
> >>>>> window is now [10, 18) which includes two records at 10 and 15.
> >>>>>
> >>>>> c. The third record is received at 19, which is after the window
> close
> >>> time
> >>>>> 18, it means that we should now start a new window starting at 19,
> i.e.
> >>> the
> >>>>> window is [19, 24),
> >>>>>
> >>>>>
> >>>>> BUT, because of the out of ordering, we did not receive (15, 3) in
> time,
> >>>>> but received (19, 5), it will cause us to mistakenly merge the
> window of
> >>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
> >>> (15, 3)
> >>>>> we realized that the previous window should have been ended at 18.
> >>>>>
> >>>>> Does that make sense to you?
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <
> matthias@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> I cannot follow the example:
> >>>>>>
> >>>>>>>> (10, 10), (15, 3), (19, 5) ...
> >>>>>>
> >>>>>> First, [10,10] is created, second the window is extended to [10,15],
> >>> and
> >>>>>> third [19,19] is created. Why would there be a [15,15]? And why
> would
> >>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
> >>>>>> thus [19,19] should be its own window?
> >>>>>>
> >>>>>>> Take a look at another example,
> >>>>>>> (13, 3),  (19, 5), (15, 3) ...
> >>>>>>>
> >>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved
> and
> >>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
> >>> [13,
> >>>>>>> 19]. Correct?
> >>>>>>
> >>>>>> This example makes sense. However, Guozhang's example was different.
> >>> The
> >>>>>> late even, _reduces_ the gap and this can lead to a window split.
> >>>>>> Guozhang's example was
> >>>>>>
> >>>>>>>>> (10, 10), (19, 5), (15, 3) ...
> >>>>>>
> >>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so 10
> >>> and
> >>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15]
> and
> >>>>>> [19,19] must be two windows, ie, original window [10,19] must be
> split.
> >>>>>>
> >>>>>>
> >>>>>> Or maybe you have different semantic about gaps are dynamically
> >>> modified
> >>>>>> in mind? It's a little unclear for the KIP itself what semantics
> >>> dynamic
> >>>>>> sessions windows should have.
> >>>>>>
> >>>>>>
> >>>>>> What is also unclear to me atm is, what use cases you have in mind?
> The
> >>>>>> KIP only says
> >>>>>>
> >>>>>>> the statistical aggregation result, liquidity of the records,
> >>>>>>
> >>>>>>
> >>>>>> I am not sure what this means. Can you elaborate?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
> >>>>>>> Hi Guozhang,
> >>>>>>>
> >>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
> >>>>> events
> >>>>>>> and glad that you brought it up.
> >>>>>>>
> >>>>>>> In the example you gave,
> >>>>>>>
> >>>>>>> (10, 10), (19, 5), (15, 3) ...
> >>>>>>>
> >>>>>>> my understanding is that the correct result window should be the
> same
> >>>>> as
> >>>>>> in
> >>>>>>> order events
> >>>>>>>
> >>>>>>> (10, 10), (15, 3), (19, 5) ...
> >>>>>>>
> >>>>>>> when (15, 3) is received, [15,15] is creatd
> >>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15,
> >>> 19]
> >>>>>> is
> >>>>>>> created, meanwhile [15,15] is removed
> >>>>>>>
> >>>>>>> back to out of order case,
> >>>>>>>
> >>>>>>> when (19 ,5) is received, [19, 19] is created
> >>>>>>> when (15, 3) is received, in order to generate the same result,
> >>>>>>> 1. if late event is later than retention period, it will be dropped
> >>>>>>> 2. otherwise, adjacent session windows within gap should be
> retrieved
> >>>>> and
> >>>>>>> merged accordingly, in this case [19, 19], and create a new session
> >>>>> [15,
> >>>>>> 19]
> >>>>>>> I'm little confused when you said "the window [15, 15] SHOULD
> actually
> >>>>> be
> >>>>>>> expired at 18 and hence the next record (19, 5) should be for a new
> >>>>>> session
> >>>>>>> already.". If i understand it correctly, the expiration of the
> window
> >>>>> is
> >>>>>>> only checked when next event (19,5) comes and then it should be
> merged
> >>>>> to
> >>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
> >>>>>>> I cannot think of a case where a window will be split by a late
> event,
> >>>>>>> because if event A and C fall into the same session window, a late
> >>>>> event
> >>>>>> B
> >>>>>>> in middle will definitely fall into C's gap as well. IOW, late
> event
> >>>>> will
> >>>>>>> only cause window extension, not split.
> >>>>>>>
> >>>>>>> Take a look at another example,
> >>>>>>> (13, 3),  (19, 5), (15, 3) ...
> >>>>>>>
> >>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved
> and
> >>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
> >>> [13,
> >>>>>>> 19]. Correct?
> >>>>>>>
> >>>>>>> To be able to achieve that, like you said, the gap needs to be
> stored
> >>>>> for
> >>>>>>> sessions. We don't need to save the gap with each event, but only
> for
> >>>>>> each
> >>>>>>> session window. To avoid upgrading existing session window, how
> about
> >>>>>>> create a new Window type extended from SessionWindow along with a
> new
> >>>>>>> KeySchema?
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Lei
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wa...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hello Lei,
> >>>>>>>>
> >>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
> >>> there
> >>>>>> is a
> >>>>>>>> question I have:
> >>>>>>>>
> >>>>>>>> The session windows are defined per key, i.e. does that mean that
> >>> each
> >>>>>>>> incoming record of the key can dynamically change the gap of the
> >>>>> window?
> >>>>>>>> For example, say you have the following record for the same key
> >>> coming
> >>>>>> in
> >>>>>>>> order, where the first time is the timestamp of the record, and
> the
> >>>>>> second
> >>>>>>>> value is the extracted gap value:
> >>>>>>>>
> >>>>>>>> (10, 10), (19, 5), ...
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> When we receive the first record at time 10, the gap is extracted
> as
> >>>>> 10,
> >>>>>>>> and hence the window will be expired at 20 if no other record is
> >>>>>> received.
> >>>>>>>> When we receive the second record at time 19, the gap is modified
> to
> >>>>> 5,
> >>>>>> and
> >>>>>>>> hence the window will be expired at 24 if no other record is
> >>> received.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> If that's the case, I'm wondering how out-of-order data can be
> >>> handled
> >>>>>>>> then, consider this stream:
> >>>>>>>>
> >>>>>>>> (10, 10), (19, 5), (15, 3) ...
> >>>>>>>>
> >>>>>>>> I.e. you received a late record indicating at timestamp 15, which
> >>>>>> shorten
> >>>>>>>> the gap to 3. It means that the window SHOULD actually be expired
> at
> >>>>> 18,
> >>>>>>>> and hence the next record (19, 5) should be for a new session
> >>> already.
> >>>>>>>> Today Streams session window implementation does not do "window
> >>>>> split",
> >>>>>> so
> >>>>>>>> have you thought about how this can be extended?
> >>>>>>>>
> >>>>>>>> Also since in your proposal each session window's gap value would
> be
> >>>>>>>> different, we need to store this value along with each record
> then,
> >>>>> how
> >>>>>>>> would we store it, and what would be the upgrade path if it is
> not a
> >>>>>>>> compatible change on disk storage etc?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com>
> wrote:
> >>>>>>>>
> >>>>>>>>> Hi All,
> >>>>>>>>>
> >>>>>>>>> I created a KIP to add dynamic gap session window support to
> Kafka
> >>>>>>>> Streams
> >>>>>>>>> DSL.
> >>>>>>>>>
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>>>>> 362%3A+Support+dynamic+gap+session+window
> >>>>>>>>>
> >>>>>>>>> Please take a look,
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Lei
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Any update on this KIP?

On 9/20/18 3:30 PM, Matthias J. Sax wrote:
> Thanks for following up. Very nice examples!
> 
> I think, that the window definition for Flink is semantically
> questionable. If there is only a single record, why is the window
> defined as [ts, ts+gap]? To me, this definition is not sound and seems
> to be arbitrary. To define the windows as [ts-gap,ts+gap] as you mention
> would be semantically more useful -- still, I think that defining the
> window as [ts,ts] as we do currently in Kafka Streams is semantically
> the best.
> 
> I have the impression, that Flink only defines them differently, because
> it solves the issues in the implementation. (Ie, an implementation
> details leaks into the semantics, what is usually not desired.)
> 
> However, I believe that we could change the implementation accordingly.
> We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) in
> RocksDB, but at API level we return [ts,ts]. This way, we can still find
> all windows we need and provide the same deterministic behavior and keep
> the current window boundaries on the semantic level (there is no need to
> store the window start and/or end time). With this technique, we can
> also implement dynamic session gaps. I think, we would need to store the
> used "gap" for each window, too. But again, this would be an
> implementation detail.
> 
> Let's see what others think.
> 
> One tricky question we would need to address is, how we can be backward
> compatible. I am currently working on KIP-258 that should help to
> address this backward compatibility issue though.
> 
> 
> -Matthias
> 
> 
> 
> On 9/19/18 5:17 PM, Lei Chen wrote:
>> Thanks Matthias. That makes sense.
>>
>> You're right that symmetric merge is necessary to ensure consistency. On
>> the other hand, I kinda feel it defeats the purpose of dynamic gap, which
>> is to update the gap from old value to new value. The symmetric merge
>> always honor the larger gap in both direction, rather than honor the gap
>> carried by record with larger timestamp. I wasn't able to find any semantic
>> definitions w.r.t this particular aspect online, but spent some time
>> looking into other streaming engines like Apache Flink.
>>
>> Apache Flink defines the window differently, that uses (start time, start
>> time + gap).
>>
>> so our previous example (10, 10), (19,5),(15,3) in Flink's case will be:
>> [10,20]
>> [19,24] => merged to [10,24]
>> [15,18] => merged to [10,24]
>>
>> while example (15,3)(19,5)(10,10) will be
>> [15,18]
>> [19,24] => no merge
>> [10,20] => merged to [10,24]
>>
>> however, since it only records gap in future direction, not past, a late
>> record might not trigger any merge where in symmetric merge it would.
>> (7,2),(10, 10), (19,5),(15,3)
>> [7,9]
>> [10,20]
>> [19,24] => merged to [10,24]
>> [15,18] => merged to [10,24]
>> so at the end
>> two windows [7,9][10,24] are there.
>>
>> As you can see, in Flink, the gap semantic is more toward to the way that,
>> a gap carried by one record only affects how this record merges with future
>> records. e.g. a later event (T2, G2) will only be merged with (T1, G1) is
>> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call
>> this "forward-merge" way of handling this. I just went thought some source
>> code and if my understanding is incorrect about Flink's implementation,
>> please correct me.
>>
>> On the other hand, if we want to do symmetric merge in Kafka Streams, we
>> can change the window definition to [start time - gap, start time + gap].
>> This way the example (7,2),(10, 10), (19,5),(15,3) will be
>> [5,9]
>> [0,20] => merged to [0,20]
>> [14,24] => merged to [0,24]
>> [12,18] => merged to [0,24]
>>
>>  (19,5),(15,3)(7,2),(10, 10) will generate same result
>> [14,24]
>> [12,18] => merged to [12,24]
>> [5,9] => no merge
>> [0,20] => merged to [0,24]
>>
>> Note that symmetric-merge would require us to change the way how Kafka
>> Steams fetch windows now, instead of fetching range from timestamp-gap to
>> timestamp+gap, we will need to fetch all windows that are not expired yet.
>> On the other hand, I'm not sure how this will impact the current logic of
>> how a window is considered as closed, because the window doesn't carry end
>> timestamp anymore, but end timestamp + gap.
>>
>> So do you guys think forward-merge approach used by Flink makes more sense
>> in Kafka Streams, or symmetric-merge makes more sense? Both of them seems
>> to me can give deterministic result.
>>
>> BTW I'll add the use case into original KIP.
>>
>> Lei
>>
>>
>> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> Thanks for explaining your understanding. And thanks for providing more
>>> details about the use-case. Maybe you can add this to the KIP?
>>>
>>>
>>> First one general comment. I guess that my and Guozhangs understanding
>>> about gap/close/gracePeriod is the same as yours -- we might not have
>>> use the term precisely correct in previous email.
>>>
>>>
>>> To you semantics of gap in detail:
>>>
>>>> I thought when (15,3) is received, kafka streams look up for neighbor
>>>> record/window that is within the gap
>>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
>>> own
>>>> window [10, 10], which is
>>>> out of the gap, so nothing will be found and no merge occurs. Hence we
>>> have
>>>> two windows now in session store,
>>>> [10, 10] and [15, 15] respectively.
>>>
>>> If you have record (10,10), we currently create a window of size
>>> [10,10]. When record (15,3) arrives, your observation that the gap 3 is
>>> too small to be merged into [10,10] window -- however, merging is a
>>> symmetric operation and the existing window of [10,10] has a gap of 10
>>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
>>> merged into the existing window resulting in window [10,15].
>>>
>>> If we don't respect the gap both ways, we end up with inconsistencies if
>>> data is out-of-order. For example, if we use the same input record
>>> (10,10) and (15,3) from above, and it happens that (15,3) is processed
>>> first, when processing out-of-order record (10,10) we would want to
>>> merge both into a single window, too?
>>>
>>> Does this make sense?
>>>
>>> Now the question remains, if two records with different gap parameter
>>> are merged, which gap should we apply for processing/merging future
>>> records into the window? It seems, that we should use the gap parameter
>>> from the record with this larges timestamp. In the example above (15,3).
>>> We would use gap 3 after merging independent of the order of processing.
>>>
>>>
>>>> Also another thing worth mentioning is that, the session window object
>>>> created in current kafka streams
>>>> implementation doesn't have gap info, it has start and end, which is the
>>>> earliest and latest event timestamp
>>>> in that window interval, i.e for (10,10), the session window gets created
>>>> is [10,10], rather than [10,20]. Just to clarify
>>>> so that it's clear why (10,10) cannot be fetched when looking for gap of
>>>> (15,3), it's because the end boundary 10 of
>>>> [10,10] is smaller than search boundary [12,18].
>>>
>>> We don't need to store the gap, because the gap is know from the window
>>> definition. The created window size depends on the data that is
>>> contained in the window. I guess one could define it differently, too,
>>> ie, for the (10,10) record, we create a window [0,20] -- not sure if it
>>> makes a big difference in practice though. Note, that creating window
>>> [10,20] would not be correct, because the gap must be applied in both
>>> directions, not just into the future.
>>>
>>> About the second part: the search would not be applied from (15,3) in
>>> range [12,18], but from existing window [10,10] into range [0,20] and 15
>>> is contained there. This example also shows, that we would need to come
>>> up with a clever way, to identify window [10,10] when processing (15,3)
>>> -- not sure atm how to do this. However, only consider (15,3) would
>>> result in inconsistencies for out-of-order data as pointed out above and
>>> would not be sufficient.
>>>
>>>
>>> Does this make sense?
>>>
>>>
>>> Or is there another way to define dynamic session gap semantics in a
>>> deterministic way with regard to out-of-order data?
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 9/11/18 4:28 PM, Lei Chen wrote:
>>>> Thanks Matthias and Guozhang for the response.
>>>>
>>>> Seems like our understanding mainly differs in the semantics of gap in
>>>> session windows.
>>>>
>>>> My understanding is that gap is used to merge nearby records together
>>> such
>>>> that no record
>>>> in the merged window has distance later than gap. In Kafka Streams's
>>>> implementation it's
>>>> mainly used to find neighbor records/windows in session store so that
>>>> nearby records can
>>>> be merge. It is NOT used to determine when a window should be closed,
>>> which
>>>> is in
>>>> fact determined by window's grace period.
>>>>
>>>> Guozhang you said "b. When later we received (15, 3), it means that this
>>>> record ** changed **
>>>> the window gap interval from 10 to 3, and hence we received a new record
>>> at
>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3)
>>> if
>>>> we have not received any new data, the window should be closed, i.e. the
>>>> window is now [10, 18) which includes two records at 10 and 15."
>>>>
>>>> This is different from what i thought will happen.
>>>>
>>>> I thought when (15,3) is received, kafka streams look up for neighbor
>>>> record/window that is within the gap
>>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
>>> own
>>>> window [10, 10], which is
>>>> out of the gap, so nothing will be found and no merge occurs. Hence we
>>> have
>>>> two windows now in session store,
>>>> [10, 10] and [15, 15] respectively.
>>>>
>>>> Also another thing worth mentioning is that, the session window object
>>>> created in current kafka streams
>>>> implementation doesn't have gap info, it has start and end, which is the
>>>> earliest and latest event timestamp
>>>> in that window interval, i.e for (10,10), the session window gets created
>>>> is [10,10], rather than [10,20]. Just to clarify
>>>> so that it's clear why (10,10) cannot be fetched when looking for gap of
>>>> (15,3), it's because the end boundary 10 of
>>>> [10,10] is smaller than search boundary [12,18].
>>>>
>>>> Please correct me if my understanding is wrong here.
>>>>
>>>> @Matthias, to answer your use case question, we have an use case where
>>>> asynchronous time series data
>>>> are received in the stream, from different contributors, with different
>>>> quality and at different pace.
>>>> Inside Kafka Streams, we use state to maintain statistic aggregations and
>>>> other mathematics model to track
>>>> the liquidity and calculate time decay rate and dynamic gap, so that at
>>>> runtime, for each contributor we can
>>>> 1. determine how many historical records we should maintain in state.
>>>> 2. for each incoming record, output a record using aggregations from
>>>> *nearby* records from that contributor.
>>>> Why fixed gap session window doesn't work here? Because the definition of
>>>> "nearby" here is determined by
>>>> several very dynamic factors in our case, it changes not only with
>>>> different hours in a day, but also related to
>>>> other contributors.
>>>> The purpose of this KIP is to suggest a dynamic session window
>>>> implementation so that we can embed such
>>>> dynamic "nearby" calculation capability into kafka streams session
>>> windows
>>>> semantics. Hope it makes sense to you.
>>>>
>>>> Lei
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>>
>>>>> Hello Lei,
>>>>>
>>>>> As Matthias mentioned, the key question here is that because of the late
>>>>> arrivals of records which may indicate a shorter session gap interval,
>>> some
>>>>> session windows may be "mistakenly" merged and hence need to be undone
>>> the
>>>>> merge, i.e. to split them again.
>>>>>
>>>>> Back to my example, you are right that the processing result of
>>>>>
>>>>> (10, 10), (19, 5), (15, 3) ..
>>>>>
>>>>> should be the same as the processing result of
>>>>>
>>>>> (10, 10), (15, 3), (19, 5) ..
>>>>>
>>>>> Note that the second value is NOT the window end time, but the extracted
>>>>> window gap interval, as you suggested in the KIP this value can be
>>>>> dynamically changed
>>>>>
>>>>> a. If you take a look at the second ordering, when we receive (10, 10)
>>> it
>>>>> means a window starting at 10 is created, and its gap interval is 10,
>>> which
>>>>> means that if by the timestamp of 20 we do not receive any new data,
>>> then
>>>>> the window should be closed, i.e. the window [10, 20).
>>>>>
>>>>> b. When later we received (15, 3), it means that this record ** changed
>>> **
>>>>> the window gap interval from 10 to 3, and hence we received a new
>>> record at
>>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 +
>>> 3) if
>>>>> we have not received any new data, the window should be closed, i.e. the
>>>>> window is now [10, 18) which includes two records at 10 and 15.
>>>>>
>>>>> c. The third record is received at 19, which is after the window close
>>> time
>>>>> 18, it means that we should now start a new window starting at 19, i.e.
>>> the
>>>>> window is [19, 24),
>>>>>
>>>>>
>>>>> BUT, because of the out of ordering, we did not receive (15, 3) in time,
>>>>> but received (19, 5), it will cause us to mistakenly merge the window of
>>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
>>> (15, 3)
>>>>> we realized that the previous window should have been ended at 18.
>>>>>
>>>>> Does that make sense to you?
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <ma...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> I cannot follow the example:
>>>>>>
>>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>>
>>>>>> First, [10,10] is created, second the window is extended to [10,15],
>>> and
>>>>>> third [19,19] is created. Why would there be a [15,15]? And why would
>>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
>>>>>> thus [19,19] should be its own window?
>>>>>>
>>>>>>> Take a look at another example,
>>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>>
>>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved and
>>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
>>> [13,
>>>>>>> 19]. Correct?
>>>>>>
>>>>>> This example makes sense. However, Guozhang's example was different.
>>> The
>>>>>> late even, _reduces_ the gap and this can lead to a window split.
>>>>>> Guozhang's example was
>>>>>>
>>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>
>>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so 10
>>> and
>>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
>>>>>> [19,19] must be two windows, ie, original window [10,19] must be split.
>>>>>>
>>>>>>
>>>>>> Or maybe you have different semantic about gaps are dynamically
>>> modified
>>>>>> in mind? It's a little unclear for the KIP itself what semantics
>>> dynamic
>>>>>> sessions windows should have.
>>>>>>
>>>>>>
>>>>>> What is also unclear to me atm is, what use cases you have in mind? The
>>>>>> KIP only says
>>>>>>
>>>>>>> the statistical aggregation result, liquidity of the records,
>>>>>>
>>>>>>
>>>>>> I am not sure what this means. Can you elaborate?
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
>>>>>>> Hi Guozhang,
>>>>>>>
>>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
>>>>> events
>>>>>>> and glad that you brought it up.
>>>>>>>
>>>>>>> In the example you gave,
>>>>>>>
>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>
>>>>>>> my understanding is that the correct result window should be the same
>>>>> as
>>>>>> in
>>>>>>> order events
>>>>>>>
>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>>>
>>>>>>> when (15, 3) is received, [15,15] is creatd
>>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15,
>>> 19]
>>>>>> is
>>>>>>> created, meanwhile [15,15] is removed
>>>>>>>
>>>>>>> back to out of order case,
>>>>>>>
>>>>>>> when (19 ,5) is received, [19, 19] is created
>>>>>>> when (15, 3) is received, in order to generate the same result,
>>>>>>> 1. if late event is later than retention period, it will be dropped
>>>>>>> 2. otherwise, adjacent session windows within gap should be retrieved
>>>>> and
>>>>>>> merged accordingly, in this case [19, 19], and create a new session
>>>>> [15,
>>>>>> 19]
>>>>>>> I'm little confused when you said "the window [15, 15] SHOULD actually
>>>>> be
>>>>>>> expired at 18 and hence the next record (19, 5) should be for a new
>>>>>> session
>>>>>>> already.". If i understand it correctly, the expiration of the window
>>>>> is
>>>>>>> only checked when next event (19,5) comes and then it should be merged
>>>>> to
>>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
>>>>>>> I cannot think of a case where a window will be split by a late event,
>>>>>>> because if event A and C fall into the same session window, a late
>>>>> event
>>>>>> B
>>>>>>> in middle will definitely fall into C's gap as well. IOW, late event
>>>>> will
>>>>>>> only cause window extension, not split.
>>>>>>>
>>>>>>> Take a look at another example,
>>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>>
>>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved and
>>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
>>> [13,
>>>>>>> 19]. Correct?
>>>>>>>
>>>>>>> To be able to achieve that, like you said, the gap needs to be stored
>>>>> for
>>>>>>> sessions. We don't need to save the gap with each event, but only for
>>>>>> each
>>>>>>> session window. To avoid upgrading existing session window, how about
>>>>>>> create a new Window type extended from SessionWindow along with a new
>>>>>>> KeySchema?
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Lei
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wa...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Lei,
>>>>>>>>
>>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
>>> there
>>>>>> is a
>>>>>>>> question I have:
>>>>>>>>
>>>>>>>> The session windows are defined per key, i.e. does that mean that
>>> each
>>>>>>>> incoming record of the key can dynamically change the gap of the
>>>>> window?
>>>>>>>> For example, say you have the following record for the same key
>>> coming
>>>>>> in
>>>>>>>> order, where the first time is the timestamp of the record, and the
>>>>>> second
>>>>>>>> value is the extracted gap value:
>>>>>>>>
>>>>>>>> (10, 10), (19, 5), ...
>>>>>>>>
>>>>>>>>
>>>>>>>> When we receive the first record at time 10, the gap is extracted as
>>>>> 10,
>>>>>>>> and hence the window will be expired at 20 if no other record is
>>>>>> received.
>>>>>>>> When we receive the second record at time 19, the gap is modified to
>>>>> 5,
>>>>>> and
>>>>>>>> hence the window will be expired at 24 if no other record is
>>> received.
>>>>>>>>
>>>>>>>>
>>>>>>>> If that's the case, I'm wondering how out-of-order data can be
>>> handled
>>>>>>>> then, consider this stream:
>>>>>>>>
>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>>
>>>>>>>> I.e. you received a late record indicating at timestamp 15, which
>>>>>> shorten
>>>>>>>> the gap to 3. It means that the window SHOULD actually be expired at
>>>>> 18,
>>>>>>>> and hence the next record (19, 5) should be for a new session
>>> already.
>>>>>>>> Today Streams session window implementation does not do "window
>>>>> split",
>>>>>> so
>>>>>>>> have you thought about how this can be extended?
>>>>>>>>
>>>>>>>> Also since in your proposal each session window's gap value would be
>>>>>>>> different, we need to store this value along with each record then,
>>>>> how
>>>>>>>> would we store it, and what would be the upgrade path if it is not a
>>>>>>>> compatible change on disk storage etc?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I created a KIP to add dynamic gap session window support to Kafka
>>>>>>>> Streams
>>>>>>>>> DSL.
>>>>>>>>>
>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>> 362%3A+Support+dynamic+gap+session+window
>>>>>>>>>
>>>>>>>>> Please take a look,
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Lei
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>>
>>
> 


Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for following up. Very nice examples!

I think, that the window definition for Flink is semantically
questionable. If there is only a single record, why is the window
defined as [ts, ts+gap]? To me, this definition is not sound and seems
to be arbitrary. To define the windows as [ts-gap,ts+gap] as you mention
would be semantically more useful -- still, I think that defining the
window as [ts,ts] as we do currently in Kafka Streams is semantically
the best.

I have the impression, that Flink only defines them differently, because
it solves the issues in the implementation. (Ie, an implementation
details leaks into the semantics, what is usually not desired.)

However, I believe that we could change the implementation accordingly.
We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) in
RocksDB, but at API level we return [ts,ts]. This way, we can still find
all windows we need and provide the same deterministic behavior and keep
the current window boundaries on the semantic level (there is no need to
store the window start and/or end time). With this technique, we can
also implement dynamic session gaps. I think, we would need to store the
used "gap" for each window, too. But again, this would be an
implementation detail.

Let's see what others think.

One tricky question we would need to address is, how we can be backward
compatible. I am currently working on KIP-258 that should help to
address this backward compatibility issue though.


-Matthias



On 9/19/18 5:17 PM, Lei Chen wrote:
> Thanks Matthias. That makes sense.
> 
> You're right that symmetric merge is necessary to ensure consistency. On
> the other hand, I kinda feel it defeats the purpose of dynamic gap, which
> is to update the gap from old value to new value. The symmetric merge
> always honor the larger gap in both direction, rather than honor the gap
> carried by record with larger timestamp. I wasn't able to find any semantic
> definitions w.r.t this particular aspect online, but spent some time
> looking into other streaming engines like Apache Flink.
> 
> Apache Flink defines the window differently, that uses (start time, start
> time + gap).
> 
> so our previous example (10, 10), (19,5),(15,3) in Flink's case will be:
> [10,20]
> [19,24] => merged to [10,24]
> [15,18] => merged to [10,24]
> 
> while example (15,3)(19,5)(10,10) will be
> [15,18]
> [19,24] => no merge
> [10,20] => merged to [10,24]
> 
> however, since it only records gap in future direction, not past, a late
> record might not trigger any merge where in symmetric merge it would.
> (7,2),(10, 10), (19,5),(15,3)
> [7,9]
> [10,20]
> [19,24] => merged to [10,24]
> [15,18] => merged to [10,24]
> so at the end
> two windows [7,9][10,24] are there.
> 
> As you can see, in Flink, the gap semantic is more toward to the way that,
> a gap carried by one record only affects how this record merges with future
> records. e.g. a later event (T2, G2) will only be merged with (T1, G1) is
> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call
> this "forward-merge" way of handling this. I just went thought some source
> code and if my understanding is incorrect about Flink's implementation,
> please correct me.
> 
> On the other hand, if we want to do symmetric merge in Kafka Streams, we
> can change the window definition to [start time - gap, start time + gap].
> This way the example (7,2),(10, 10), (19,5),(15,3) will be
> [5,9]
> [0,20] => merged to [0,20]
> [14,24] => merged to [0,24]
> [12,18] => merged to [0,24]
> 
>  (19,5),(15,3)(7,2),(10, 10) will generate same result
> [14,24]
> [12,18] => merged to [12,24]
> [5,9] => no merge
> [0,20] => merged to [0,24]
> 
> Note that symmetric-merge would require us to change the way how Kafka
> Steams fetch windows now, instead of fetching range from timestamp-gap to
> timestamp+gap, we will need to fetch all windows that are not expired yet.
> On the other hand, I'm not sure how this will impact the current logic of
> how a window is considered as closed, because the window doesn't carry end
> timestamp anymore, but end timestamp + gap.
> 
> So do you guys think forward-merge approach used by Flink makes more sense
> in Kafka Streams, or symmetric-merge makes more sense? Both of them seems
> to me can give deterministic result.
> 
> BTW I'll add the use case into original KIP.
> 
> Lei
> 
> 
> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Thanks for explaining your understanding. And thanks for providing more
>> details about the use-case. Maybe you can add this to the KIP?
>>
>>
>> First one general comment. I guess that my and Guozhangs understanding
>> about gap/close/gracePeriod is the same as yours -- we might not have
>> use the term precisely correct in previous email.
>>
>>
>> To you semantics of gap in detail:
>>
>>> I thought when (15,3) is received, kafka streams look up for neighbor
>>> record/window that is within the gap
>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
>> own
>>> window [10, 10], which is
>>> out of the gap, so nothing will be found and no merge occurs. Hence we
>> have
>>> two windows now in session store,
>>> [10, 10] and [15, 15] respectively.
>>
>> If you have record (10,10), we currently create a window of size
>> [10,10]. When record (15,3) arrives, your observation that the gap 3 is
>> too small to be merged into [10,10] window -- however, merging is a
>> symmetric operation and the existing window of [10,10] has a gap of 10
>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
>> merged into the existing window resulting in window [10,15].
>>
>> If we don't respect the gap both ways, we end up with inconsistencies if
>> data is out-of-order. For example, if we use the same input record
>> (10,10) and (15,3) from above, and it happens that (15,3) is processed
>> first, when processing out-of-order record (10,10) we would want to
>> merge both into a single window, too?
>>
>> Does this make sense?
>>
>> Now the question remains, if two records with different gap parameter
>> are merged, which gap should we apply for processing/merging future
>> records into the window? It seems, that we should use the gap parameter
>> from the record with this larges timestamp. In the example above (15,3).
>> We would use gap 3 after merging independent of the order of processing.
>>
>>
>>> Also another thing worth mentioning is that, the session window object
>>> created in current kafka streams
>>> implementation doesn't have gap info, it has start and end, which is the
>>> earliest and latest event timestamp
>>> in that window interval, i.e for (10,10), the session window gets created
>>> is [10,10], rather than [10,20]. Just to clarify
>>> so that it's clear why (10,10) cannot be fetched when looking for gap of
>>> (15,3), it's because the end boundary 10 of
>>> [10,10] is smaller than search boundary [12,18].
>>
>> We don't need to store the gap, because the gap is know from the window
>> definition. The created window size depends on the data that is
>> contained in the window. I guess one could define it differently, too,
>> ie, for the (10,10) record, we create a window [0,20] -- not sure if it
>> makes a big difference in practice though. Note, that creating window
>> [10,20] would not be correct, because the gap must be applied in both
>> directions, not just into the future.
>>
>> About the second part: the search would not be applied from (15,3) in
>> range [12,18], but from existing window [10,10] into range [0,20] and 15
>> is contained there. This example also shows, that we would need to come
>> up with a clever way, to identify window [10,10] when processing (15,3)
>> -- not sure atm how to do this. However, only consider (15,3) would
>> result in inconsistencies for out-of-order data as pointed out above and
>> would not be sufficient.
>>
>>
>> Does this make sense?
>>
>>
>> Or is there another way to define dynamic session gap semantics in a
>> deterministic way with regard to out-of-order data?
>>
>>
>>
>> -Matthias
>>
>>
>> On 9/11/18 4:28 PM, Lei Chen wrote:
>>> Thanks Matthias and Guozhang for the response.
>>>
>>> Seems like our understanding mainly differs in the semantics of gap in
>>> session windows.
>>>
>>> My understanding is that gap is used to merge nearby records together
>> such
>>> that no record
>>> in the merged window has distance later than gap. In Kafka Streams's
>>> implementation it's
>>> mainly used to find neighbor records/windows in session store so that
>>> nearby records can
>>> be merge. It is NOT used to determine when a window should be closed,
>> which
>>> is in
>>> fact determined by window's grace period.
>>>
>>> Guozhang you said "b. When later we received (15, 3), it means that this
>>> record ** changed **
>>> the window gap interval from 10 to 3, and hence we received a new record
>> at
>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3)
>> if
>>> we have not received any new data, the window should be closed, i.e. the
>>> window is now [10, 18) which includes two records at 10 and 15."
>>>
>>> This is different from what i thought will happen.
>>>
>>> I thought when (15,3) is received, kafka streams look up for neighbor
>>> record/window that is within the gap
>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
>> own
>>> window [10, 10], which is
>>> out of the gap, so nothing will be found and no merge occurs. Hence we
>> have
>>> two windows now in session store,
>>> [10, 10] and [15, 15] respectively.
>>>
>>> Also another thing worth mentioning is that, the session window object
>>> created in current kafka streams
>>> implementation doesn't have gap info, it has start and end, which is the
>>> earliest and latest event timestamp
>>> in that window interval, i.e for (10,10), the session window gets created
>>> is [10,10], rather than [10,20]. Just to clarify
>>> so that it's clear why (10,10) cannot be fetched when looking for gap of
>>> (15,3), it's because the end boundary 10 of
>>> [10,10] is smaller than search boundary [12,18].
>>>
>>> Please correct me if my understanding is wrong here.
>>>
>>> @Matthias, to answer your use case question, we have an use case where
>>> asynchronous time series data
>>> are received in the stream, from different contributors, with different
>>> quality and at different pace.
>>> Inside Kafka Streams, we use state to maintain statistic aggregations and
>>> other mathematics model to track
>>> the liquidity and calculate time decay rate and dynamic gap, so that at
>>> runtime, for each contributor we can
>>> 1. determine how many historical records we should maintain in state.
>>> 2. for each incoming record, output a record using aggregations from
>>> *nearby* records from that contributor.
>>> Why fixed gap session window doesn't work here? Because the definition of
>>> "nearby" here is determined by
>>> several very dynamic factors in our case, it changes not only with
>>> different hours in a day, but also related to
>>> other contributors.
>>> The purpose of this KIP is to suggest a dynamic session window
>>> implementation so that we can embed such
>>> dynamic "nearby" calculation capability into kafka streams session
>> windows
>>> semantics. Hope it makes sense to you.
>>>
>>> Lei
>>>
>>>
>>>
>>>
>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wa...@gmail.com>
>> wrote:
>>>
>>>> Hello Lei,
>>>>
>>>> As Matthias mentioned, the key question here is that because of the late
>>>> arrivals of records which may indicate a shorter session gap interval,
>> some
>>>> session windows may be "mistakenly" merged and hence need to be undone
>> the
>>>> merge, i.e. to split them again.
>>>>
>>>> Back to my example, you are right that the processing result of
>>>>
>>>> (10, 10), (19, 5), (15, 3) ..
>>>>
>>>> should be the same as the processing result of
>>>>
>>>> (10, 10), (15, 3), (19, 5) ..
>>>>
>>>> Note that the second value is NOT the window end time, but the extracted
>>>> window gap interval, as you suggested in the KIP this value can be
>>>> dynamically changed
>>>>
>>>> a. If you take a look at the second ordering, when we receive (10, 10)
>> it
>>>> means a window starting at 10 is created, and its gap interval is 10,
>> which
>>>> means that if by the timestamp of 20 we do not receive any new data,
>> then
>>>> the window should be closed, i.e. the window [10, 20).
>>>>
>>>> b. When later we received (15, 3), it means that this record ** changed
>> **
>>>> the window gap interval from 10 to 3, and hence we received a new
>> record at
>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 +
>> 3) if
>>>> we have not received any new data, the window should be closed, i.e. the
>>>> window is now [10, 18) which includes two records at 10 and 15.
>>>>
>>>> c. The third record is received at 19, which is after the window close
>> time
>>>> 18, it means that we should now start a new window starting at 19, i.e.
>> the
>>>> window is [19, 24),
>>>>
>>>>
>>>> BUT, because of the out of ordering, we did not receive (15, 3) in time,
>>>> but received (19, 5), it will cause us to mistakenly merge the window of
>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
>> (15, 3)
>>>> we realized that the previous window should have been ended at 18.
>>>>
>>>> Does that make sense to you?
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <ma...@confluent.io>
>>>> wrote:
>>>>
>>>>> I cannot follow the example:
>>>>>
>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>
>>>>> First, [10,10] is created, second the window is extended to [10,15],
>> and
>>>>> third [19,19] is created. Why would there be a [15,15]? And why would
>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
>>>>> thus [19,19] should be its own window?
>>>>>
>>>>>> Take a look at another example,
>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>
>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved and
>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
>> [13,
>>>>>> 19]. Correct?
>>>>>
>>>>> This example makes sense. However, Guozhang's example was different.
>> The
>>>>> late even, _reduces_ the gap and this can lead to a window split.
>>>>> Guozhang's example was
>>>>>
>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>
>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so 10
>> and
>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
>>>>> [19,19] must be two windows, ie, original window [10,19] must be split.
>>>>>
>>>>>
>>>>> Or maybe you have different semantic about gaps are dynamically
>> modified
>>>>> in mind? It's a little unclear for the KIP itself what semantics
>> dynamic
>>>>> sessions windows should have.
>>>>>
>>>>>
>>>>> What is also unclear to me atm is, what use cases you have in mind? The
>>>>> KIP only says
>>>>>
>>>>>> the statistical aggregation result, liquidity of the records,
>>>>>
>>>>>
>>>>> I am not sure what this means. Can you elaborate?
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
>>>>>> Hi Guozhang,
>>>>>>
>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
>>>> events
>>>>>> and glad that you brought it up.
>>>>>>
>>>>>> In the example you gave,
>>>>>>
>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>
>>>>>> my understanding is that the correct result window should be the same
>>>> as
>>>>> in
>>>>>> order events
>>>>>>
>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>>
>>>>>> when (15, 3) is received, [15,15] is creatd
>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15,
>> 19]
>>>>> is
>>>>>> created, meanwhile [15,15] is removed
>>>>>>
>>>>>> back to out of order case,
>>>>>>
>>>>>> when (19 ,5) is received, [19, 19] is created
>>>>>> when (15, 3) is received, in order to generate the same result,
>>>>>> 1. if late event is later than retention period, it will be dropped
>>>>>> 2. otherwise, adjacent session windows within gap should be retrieved
>>>> and
>>>>>> merged accordingly, in this case [19, 19], and create a new session
>>>> [15,
>>>>> 19]
>>>>>> I'm little confused when you said "the window [15, 15] SHOULD actually
>>>> be
>>>>>> expired at 18 and hence the next record (19, 5) should be for a new
>>>>> session
>>>>>> already.". If i understand it correctly, the expiration of the window
>>>> is
>>>>>> only checked when next event (19,5) comes and then it should be merged
>>>> to
>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
>>>>>> I cannot think of a case where a window will be split by a late event,
>>>>>> because if event A and C fall into the same session window, a late
>>>> event
>>>>> B
>>>>>> in middle will definitely fall into C's gap as well. IOW, late event
>>>> will
>>>>>> only cause window extension, not split.
>>>>>>
>>>>>> Take a look at another example,
>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>
>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved and
>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
>> [13,
>>>>>> 19]. Correct?
>>>>>>
>>>>>> To be able to achieve that, like you said, the gap needs to be stored
>>>> for
>>>>>> sessions. We don't need to save the gap with each event, but only for
>>>>> each
>>>>>> session window. To avoid upgrading existing session window, how about
>>>>>> create a new Window type extended from SessionWindow along with a new
>>>>>> KeySchema?
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Lei
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wa...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Hello Lei,
>>>>>>>
>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
>> there
>>>>> is a
>>>>>>> question I have:
>>>>>>>
>>>>>>> The session windows are defined per key, i.e. does that mean that
>> each
>>>>>>> incoming record of the key can dynamically change the gap of the
>>>> window?
>>>>>>> For example, say you have the following record for the same key
>> coming
>>>>> in
>>>>>>> order, where the first time is the timestamp of the record, and the
>>>>> second
>>>>>>> value is the extracted gap value:
>>>>>>>
>>>>>>> (10, 10), (19, 5), ...
>>>>>>>
>>>>>>>
>>>>>>> When we receive the first record at time 10, the gap is extracted as
>>>> 10,
>>>>>>> and hence the window will be expired at 20 if no other record is
>>>>> received.
>>>>>>> When we receive the second record at time 19, the gap is modified to
>>>> 5,
>>>>> and
>>>>>>> hence the window will be expired at 24 if no other record is
>> received.
>>>>>>>
>>>>>>>
>>>>>>> If that's the case, I'm wondering how out-of-order data can be
>> handled
>>>>>>> then, consider this stream:
>>>>>>>
>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>
>>>>>>> I.e. you received a late record indicating at timestamp 15, which
>>>>> shorten
>>>>>>> the gap to 3. It means that the window SHOULD actually be expired at
>>>> 18,
>>>>>>> and hence the next record (19, 5) should be for a new session
>> already.
>>>>>>> Today Streams session window implementation does not do "window
>>>> split",
>>>>> so
>>>>>>> have you thought about how this can be extended?
>>>>>>>
>>>>>>> Also since in your proposal each session window's gap value would be
>>>>>>> different, we need to store this value along with each record then,
>>>> how
>>>>>>> would we store it, and what would be the upgrade path if it is not a
>>>>>>> compatible change on disk storage etc?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I created a KIP to add dynamic gap session window support to Kafka
>>>>>>> Streams
>>>>>>>> DSL.
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>> 362%3A+Support+dynamic+gap+session+window
>>>>>>>>
>>>>>>>> Please take a look,
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Lei
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
> 


Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by Lei Chen <le...@gmail.com>.
Thanks Matthias. That makes sense.

You're right that symmetric merge is necessary to ensure consistency. On
the other hand, I kinda feel it defeats the purpose of dynamic gap, which
is to update the gap from old value to new value. The symmetric merge
always honor the larger gap in both direction, rather than honor the gap
carried by record with larger timestamp. I wasn't able to find any semantic
definitions w.r.t this particular aspect online, but spent some time
looking into other streaming engines like Apache Flink.

Apache Flink defines the window differently, that uses (start time, start
time + gap).

so our previous example (10, 10), (19,5),(15,3) in Flink's case will be:
[10,20]
[19,24] => merged to [10,24]
[15,18] => merged to [10,24]

while example (15,3)(19,5)(10,10) will be
[15,18]
[19,24] => no merge
[10,20] => merged to [10,24]

however, since it only records gap in future direction, not past, a late
record might not trigger any merge where in symmetric merge it would.
(7,2),(10, 10), (19,5),(15,3)
[7,9]
[10,20]
[19,24] => merged to [10,24]
[15,18] => merged to [10,24]
so at the end
two windows [7,9][10,24] are there.

As you can see, in Flink, the gap semantic is more toward to the way that,
a gap carried by one record only affects how this record merges with future
records. e.g. a later event (T2, G2) will only be merged with (T1, G1) is
T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call
this "forward-merge" way of handling this. I just went thought some source
code and if my understanding is incorrect about Flink's implementation,
please correct me.

On the other hand, if we want to do symmetric merge in Kafka Streams, we
can change the window definition to [start time - gap, start time + gap].
This way the example (7,2),(10, 10), (19,5),(15,3) will be
[5,9]
[0,20] => merged to [0,20]
[14,24] => merged to [0,24]
[12,18] => merged to [0,24]

 (19,5),(15,3)(7,2),(10, 10) will generate same result
[14,24]
[12,18] => merged to [12,24]
[5,9] => no merge
[0,20] => merged to [0,24]

Note that symmetric-merge would require us to change the way how Kafka
Steams fetch windows now, instead of fetching range from timestamp-gap to
timestamp+gap, we will need to fetch all windows that are not expired yet.
On the other hand, I'm not sure how this will impact the current logic of
how a window is considered as closed, because the window doesn't carry end
timestamp anymore, but end timestamp + gap.

So do you guys think forward-merge approach used by Flink makes more sense
in Kafka Streams, or symmetric-merge makes more sense? Both of them seems
to me can give deterministic result.

BTW I'll add the use case into original KIP.

Lei


On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks for explaining your understanding. And thanks for providing more
> details about the use-case. Maybe you can add this to the KIP?
>
>
> First one general comment. I guess that my and Guozhangs understanding
> about gap/close/gracePeriod is the same as yours -- we might not have
> use the term precisely correct in previous email.
>
>
> To you semantics of gap in detail:
>
> > I thought when (15,3) is received, kafka streams look up for neighbor
> > record/window that is within the gap
> > of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
> own
> > window [10, 10], which is
> > out of the gap, so nothing will be found and no merge occurs. Hence we
> have
> > two windows now in session store,
> > [10, 10] and [15, 15] respectively.
>
> If you have record (10,10), we currently create a window of size
> [10,10]. When record (15,3) arrives, your observation that the gap 3 is
> too small to be merged into [10,10] window -- however, merging is a
> symmetric operation and the existing window of [10,10] has a gap of 10
> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
> merged into the existing window resulting in window [10,15].
>
> If we don't respect the gap both ways, we end up with inconsistencies if
> data is out-of-order. For example, if we use the same input record
> (10,10) and (15,3) from above, and it happens that (15,3) is processed
> first, when processing out-of-order record (10,10) we would want to
> merge both into a single window, too?
>
> Does this make sense?
>
> Now the question remains, if two records with different gap parameter
> are merged, which gap should we apply for processing/merging future
> records into the window? It seems, that we should use the gap parameter
> from the record with this larges timestamp. In the example above (15,3).
> We would use gap 3 after merging independent of the order of processing.
>
>
> > Also another thing worth mentioning is that, the session window object
> > created in current kafka streams
> > implementation doesn't have gap info, it has start and end, which is the
> > earliest and latest event timestamp
> > in that window interval, i.e for (10,10), the session window gets created
> > is [10,10], rather than [10,20]. Just to clarify
> > so that it's clear why (10,10) cannot be fetched when looking for gap of
> > (15,3), it's because the end boundary 10 of
> > [10,10] is smaller than search boundary [12,18].
>
> We don't need to store the gap, because the gap is know from the window
> definition. The created window size depends on the data that is
> contained in the window. I guess one could define it differently, too,
> ie, for the (10,10) record, we create a window [0,20] -- not sure if it
> makes a big difference in practice though. Note, that creating window
> [10,20] would not be correct, because the gap must be applied in both
> directions, not just into the future.
>
> About the second part: the search would not be applied from (15,3) in
> range [12,18], but from existing window [10,10] into range [0,20] and 15
> is contained there. This example also shows, that we would need to come
> up with a clever way, to identify window [10,10] when processing (15,3)
> -- not sure atm how to do this. However, only consider (15,3) would
> result in inconsistencies for out-of-order data as pointed out above and
> would not be sufficient.
>
>
> Does this make sense?
>
>
> Or is there another way to define dynamic session gap semantics in a
> deterministic way with regard to out-of-order data?
>
>
>
> -Matthias
>
>
> On 9/11/18 4:28 PM, Lei Chen wrote:
> > Thanks Matthias and Guozhang for the response.
> >
> > Seems like our understanding mainly differs in the semantics of gap in
> > session windows.
> >
> > My understanding is that gap is used to merge nearby records together
> such
> > that no record
> > in the merged window has distance later than gap. In Kafka Streams's
> > implementation it's
> > mainly used to find neighbor records/windows in session store so that
> > nearby records can
> > be merge. It is NOT used to determine when a window should be closed,
> which
> > is in
> > fact determined by window's grace period.
> >
> > Guozhang you said "b. When later we received (15, 3), it means that this
> > record ** changed **
> > the window gap interval from 10 to 3, and hence we received a new record
> at
> > 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3)
> if
> > we have not received any new data, the window should be closed, i.e. the
> > window is now [10, 18) which includes two records at 10 and 15."
> >
> > This is different from what i thought will happen.
> >
> > I thought when (15,3) is received, kafka streams look up for neighbor
> > record/window that is within the gap
> > of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
> own
> > window [10, 10], which is
> > out of the gap, so nothing will be found and no merge occurs. Hence we
> have
> > two windows now in session store,
> > [10, 10] and [15, 15] respectively.
> >
> > Also another thing worth mentioning is that, the session window object
> > created in current kafka streams
> > implementation doesn't have gap info, it has start and end, which is the
> > earliest and latest event timestamp
> > in that window interval, i.e for (10,10), the session window gets created
> > is [10,10], rather than [10,20]. Just to clarify
> > so that it's clear why (10,10) cannot be fetched when looking for gap of
> > (15,3), it's because the end boundary 10 of
> > [10,10] is smaller than search boundary [12,18].
> >
> > Please correct me if my understanding is wrong here.
> >
> > @Matthias, to answer your use case question, we have an use case where
> > asynchronous time series data
> > are received in the stream, from different contributors, with different
> > quality and at different pace.
> > Inside Kafka Streams, we use state to maintain statistic aggregations and
> > other mathematics model to track
> > the liquidity and calculate time decay rate and dynamic gap, so that at
> > runtime, for each contributor we can
> > 1. determine how many historical records we should maintain in state.
> > 2. for each incoming record, output a record using aggregations from
> > *nearby* records from that contributor.
> > Why fixed gap session window doesn't work here? Because the definition of
> > "nearby" here is determined by
> > several very dynamic factors in our case, it changes not only with
> > different hours in a day, but also related to
> > other contributors.
> > The purpose of this KIP is to suggest a dynamic session window
> > implementation so that we can embed such
> > dynamic "nearby" calculation capability into kafka streams session
> windows
> > semantics. Hope it makes sense to you.
> >
> > Lei
> >
> >
> >
> >
> > On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Hello Lei,
> >>
> >> As Matthias mentioned, the key question here is that because of the late
> >> arrivals of records which may indicate a shorter session gap interval,
> some
> >> session windows may be "mistakenly" merged and hence need to be undone
> the
> >> merge, i.e. to split them again.
> >>
> >> Back to my example, you are right that the processing result of
> >>
> >> (10, 10), (19, 5), (15, 3) ..
> >>
> >> should be the same as the processing result of
> >>
> >> (10, 10), (15, 3), (19, 5) ..
> >>
> >> Note that the second value is NOT the window end time, but the extracted
> >> window gap interval, as you suggested in the KIP this value can be
> >> dynamically changed
> >>
> >> a. If you take a look at the second ordering, when we receive (10, 10)
> it
> >> means a window starting at 10 is created, and its gap interval is 10,
> which
> >> means that if by the timestamp of 20 we do not receive any new data,
> then
> >> the window should be closed, i.e. the window [10, 20).
> >>
> >> b. When later we received (15, 3), it means that this record ** changed
> **
> >> the window gap interval from 10 to 3, and hence we received a new
> record at
> >> 15, with the new window gap of 3, it means that by timestamp 18 (15 +
> 3) if
> >> we have not received any new data, the window should be closed, i.e. the
> >> window is now [10, 18) which includes two records at 10 and 15.
> >>
> >> c. The third record is received at 19, which is after the window close
> time
> >> 18, it means that we should now start a new window starting at 19, i.e.
> the
> >> window is [19, 24),
> >>
> >>
> >> BUT, because of the out of ordering, we did not receive (15, 3) in time,
> >> but received (19, 5), it will cause us to mistakenly merge the window of
> >> [10, 20) with [19, 24) to [10, 24), and only when later we received
> (15, 3)
> >> we realized that the previous window should have been ended at 18.
> >>
> >> Does that make sense to you?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <ma...@confluent.io>
> >> wrote:
> >>
> >>> I cannot follow the example:
> >>>
> >>>>> (10, 10), (15, 3), (19, 5) ...
> >>>
> >>> First, [10,10] is created, second the window is extended to [10,15],
> and
> >>> third [19,19] is created. Why would there be a [15,15]? And why would
> >>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
> >>> thus [19,19] should be its own window?
> >>>
> >>>> Take a look at another example,
> >>>> (13, 3),  (19, 5), (15, 3) ...
> >>>>
> >>>> in this case when (15, 3) is received, [13,13] should be retrieved and
> >>>> merged to a new window [13, 15], then [19,19] should be updated to
> [13,
> >>>> 19]. Correct?
> >>>
> >>> This example makes sense. However, Guozhang's example was different.
> The
> >>> late even, _reduces_ the gap and this can lead to a window split.
> >>> Guozhang's example was
> >>>
> >>>>>> (10, 10), (19, 5), (15, 3) ...
> >>>
> >>> First [10,10] is created, second [10,19] is create (gap is 10, so 10
> and
> >>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
> >>> [19,19] must be two windows, ie, original window [10,19] must be split.
> >>>
> >>>
> >>> Or maybe you have different semantic about gaps are dynamically
> modified
> >>> in mind? It's a little unclear for the KIP itself what semantics
> dynamic
> >>> sessions windows should have.
> >>>
> >>>
> >>> What is also unclear to me atm is, what use cases you have in mind? The
> >>> KIP only says
> >>>
> >>>> the statistical aggregation result, liquidity of the records,
> >>>
> >>>
> >>> I am not sure what this means. Can you elaborate?
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 8/30/18 3:32 PM, Lei Chen wrote:
> >>>> Hi Guozhang,
> >>>>
> >>>> Thanks for reviewing the proposal. I didn't think of out of order
> >> events
> >>>> and glad that you brought it up.
> >>>>
> >>>> In the example you gave,
> >>>>
> >>>> (10, 10), (19, 5), (15, 3) ...
> >>>>
> >>>> my understanding is that the correct result window should be the same
> >> as
> >>> in
> >>>> order events
> >>>>
> >>>> (10, 10), (15, 3), (19, 5) ...
> >>>>
> >>>> when (15, 3) is received, [15,15] is creatd
> >>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15,
> 19]
> >>> is
> >>>> created, meanwhile [15,15] is removed
> >>>>
> >>>> back to out of order case,
> >>>>
> >>>> when (19 ,5) is received, [19, 19] is created
> >>>> when (15, 3) is received, in order to generate the same result,
> >>>> 1. if late event is later than retention period, it will be dropped
> >>>> 2. otherwise, adjacent session windows within gap should be retrieved
> >> and
> >>>> merged accordingly, in this case [19, 19], and create a new session
> >> [15,
> >>> 19]
> >>>> I'm little confused when you said "the window [15, 15] SHOULD actually
> >> be
> >>>> expired at 18 and hence the next record (19, 5) should be for a new
> >>> session
> >>>> already.". If i understand it correctly, the expiration of the window
> >> is
> >>>> only checked when next event (19,5) comes and then it should be merged
> >> to
> >>>> it. [15, 15] will then be closed. Is that also what you meant?
> >>>> I cannot think of a case where a window will be split by a late event,
> >>>> because if event A and C fall into the same session window, a late
> >> event
> >>> B
> >>>> in middle will definitely fall into C's gap as well. IOW, late event
> >> will
> >>>> only cause window extension, not split.
> >>>>
> >>>> Take a look at another example,
> >>>> (13, 3),  (19, 5), (15, 3) ...
> >>>>
> >>>> in this case when (15, 3) is received, [13,13] should be retrieved and
> >>>> merged to a new window [13, 15], then [19,19] should be updated to
> [13,
> >>>> 19]. Correct?
> >>>>
> >>>> To be able to achieve that, like you said, the gap needs to be stored
> >> for
> >>>> sessions. We don't need to save the gap with each event, but only for
> >>> each
> >>>> session window. To avoid upgrading existing session window, how about
> >>>> create a new Window type extended from SessionWindow along with a new
> >>>> KeySchema?
> >>>>
> >>>> What do you think?
> >>>>
> >>>> Lei
> >>>>
> >>>>
> >>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hello Lei,
> >>>>>
> >>>>> Thanks for the proposal. I've just made a quick pass over it and
> there
> >>> is a
> >>>>> question I have:
> >>>>>
> >>>>> The session windows are defined per key, i.e. does that mean that
> each
> >>>>> incoming record of the key can dynamically change the gap of the
> >> window?
> >>>>> For example, say you have the following record for the same key
> coming
> >>> in
> >>>>> order, where the first time is the timestamp of the record, and the
> >>> second
> >>>>> value is the extracted gap value:
> >>>>>
> >>>>> (10, 10), (19, 5), ...
> >>>>>
> >>>>>
> >>>>> When we receive the first record at time 10, the gap is extracted as
> >> 10,
> >>>>> and hence the window will be expired at 20 if no other record is
> >>> received.
> >>>>> When we receive the second record at time 19, the gap is modified to
> >> 5,
> >>> and
> >>>>> hence the window will be expired at 24 if no other record is
> received.
> >>>>>
> >>>>>
> >>>>> If that's the case, I'm wondering how out-of-order data can be
> handled
> >>>>> then, consider this stream:
> >>>>>
> >>>>> (10, 10), (19, 5), (15, 3) ...
> >>>>>
> >>>>> I.e. you received a late record indicating at timestamp 15, which
> >>> shorten
> >>>>> the gap to 3. It means that the window SHOULD actually be expired at
> >> 18,
> >>>>> and hence the next record (19, 5) should be for a new session
> already.
> >>>>> Today Streams session window implementation does not do "window
> >> split",
> >>> so
> >>>>> have you thought about how this can be extended?
> >>>>>
> >>>>> Also since in your proposal each session window's gap value would be
> >>>>> different, we need to store this value along with each record then,
> >> how
> >>>>> would we store it, and what would be the upgrade path if it is not a
> >>>>> compatible change on disk storage etc?
> >>>>>
> >>>>>
> >>>>>
> >>>>> Guozhang
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> I created a KIP to add dynamic gap session window support to Kafka
> >>>>> Streams
> >>>>>> DSL.
> >>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>> 362%3A+Support+dynamic+gap+session+window
> >>>>>>
> >>>>>> Please take a look,
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Lei
> >>>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> -- Guozhang
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>

Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for explaining your understanding. And thanks for providing more
details about the use-case. Maybe you can add this to the KIP?


First one general comment. I guess that my and Guozhangs understanding
about gap/close/gracePeriod is the same as yours -- we might not have
use the term precisely correct in previous email.


To you semantics of gap in detail:

> I thought when (15,3) is received, kafka streams look up for neighbor
> record/window that is within the gap
> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its own
> window [10, 10], which is
> out of the gap, so nothing will be found and no merge occurs. Hence we have
> two windows now in session store,
> [10, 10] and [15, 15] respectively.

If you have record (10,10), we currently create a window of size
[10,10]. When record (15,3) arrives, your observation that the gap 3 is
too small to be merged into [10,10] window -- however, merging is a
symmetric operation and the existing window of [10,10] has a gap of 10
defined: thus, 15 is close enough to fall into the gap, and (15,3) is
merged into the existing window resulting in window [10,15].

If we don't respect the gap both ways, we end up with inconsistencies if
data is out-of-order. For example, if we use the same input record
(10,10) and (15,3) from above, and it happens that (15,3) is processed
first, when processing out-of-order record (10,10) we would want to
merge both into a single window, too?

Does this make sense?

Now the question remains, if two records with different gap parameter
are merged, which gap should we apply for processing/merging future
records into the window? It seems, that we should use the gap parameter
from the record with this larges timestamp. In the example above (15,3).
We would use gap 3 after merging independent of the order of processing.


> Also another thing worth mentioning is that, the session window object
> created in current kafka streams
> implementation doesn't have gap info, it has start and end, which is the
> earliest and latest event timestamp
> in that window interval, i.e for (10,10), the session window gets created
> is [10,10], rather than [10,20]. Just to clarify
> so that it's clear why (10,10) cannot be fetched when looking for gap of
> (15,3), it's because the end boundary 10 of
> [10,10] is smaller than search boundary [12,18].

We don't need to store the gap, because the gap is know from the window
definition. The created window size depends on the data that is
contained in the window. I guess one could define it differently, too,
ie, for the (10,10) record, we create a window [0,20] -- not sure if it
makes a big difference in practice though. Note, that creating window
[10,20] would not be correct, because the gap must be applied in both
directions, not just into the future.

About the second part: the search would not be applied from (15,3) in
range [12,18], but from existing window [10,10] into range [0,20] and 15
is contained there. This example also shows, that we would need to come
up with a clever way, to identify window [10,10] when processing (15,3)
-- not sure atm how to do this. However, only consider (15,3) would
result in inconsistencies for out-of-order data as pointed out above and
would not be sufficient.


Does this make sense?


Or is there another way to define dynamic session gap semantics in a
deterministic way with regard to out-of-order data?



-Matthias


On 9/11/18 4:28 PM, Lei Chen wrote:
> Thanks Matthias and Guozhang for the response.
> 
> Seems like our understanding mainly differs in the semantics of gap in
> session windows.
> 
> My understanding is that gap is used to merge nearby records together such
> that no record
> in the merged window has distance later than gap. In Kafka Streams's
> implementation it's
> mainly used to find neighbor records/windows in session store so that
> nearby records can
> be merge. It is NOT used to determine when a window should be closed, which
> is in
> fact determined by window's grace period.
> 
> Guozhang you said "b. When later we received (15, 3), it means that this
> record ** changed **
> the window gap interval from 10 to 3, and hence we received a new record at
> 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3) if
> we have not received any new data, the window should be closed, i.e. the
> window is now [10, 18) which includes two records at 10 and 15."
> 
> This is different from what i thought will happen.
> 
> I thought when (15,3) is received, kafka streams look up for neighbor
> record/window that is within the gap
> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its own
> window [10, 10], which is
> out of the gap, so nothing will be found and no merge occurs. Hence we have
> two windows now in session store,
> [10, 10] and [15, 15] respectively.
> 
> Also another thing worth mentioning is that, the session window object
> created in current kafka streams
> implementation doesn't have gap info, it has start and end, which is the
> earliest and latest event timestamp
> in that window interval, i.e for (10,10), the session window gets created
> is [10,10], rather than [10,20]. Just to clarify
> so that it's clear why (10,10) cannot be fetched when looking for gap of
> (15,3), it's because the end boundary 10 of
> [10,10] is smaller than search boundary [12,18].
> 
> Please correct me if my understanding is wrong here.
> 
> @Matthias, to answer your use case question, we have an use case where
> asynchronous time series data
> are received in the stream, from different contributors, with different
> quality and at different pace.
> Inside Kafka Streams, we use state to maintain statistic aggregations and
> other mathematics model to track
> the liquidity and calculate time decay rate and dynamic gap, so that at
> runtime, for each contributor we can
> 1. determine how many historical records we should maintain in state.
> 2. for each incoming record, output a record using aggregations from
> *nearby* records from that contributor.
> Why fixed gap session window doesn't work here? Because the definition of
> "nearby" here is determined by
> several very dynamic factors in our case, it changes not only with
> different hours in a day, but also related to
> other contributors.
> The purpose of this KIP is to suggest a dynamic session window
> implementation so that we can embed such
> dynamic "nearby" calculation capability into kafka streams session windows
> semantics. Hope it makes sense to you.
> 
> Lei
> 
> 
> 
> 
> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wa...@gmail.com> wrote:
> 
>> Hello Lei,
>>
>> As Matthias mentioned, the key question here is that because of the late
>> arrivals of records which may indicate a shorter session gap interval, some
>> session windows may be "mistakenly" merged and hence need to be undone the
>> merge, i.e. to split them again.
>>
>> Back to my example, you are right that the processing result of
>>
>> (10, 10), (19, 5), (15, 3) ..
>>
>> should be the same as the processing result of
>>
>> (10, 10), (15, 3), (19, 5) ..
>>
>> Note that the second value is NOT the window end time, but the extracted
>> window gap interval, as you suggested in the KIP this value can be
>> dynamically changed
>>
>> a. If you take a look at the second ordering, when we receive (10, 10) it
>> means a window starting at 10 is created, and its gap interval is 10, which
>> means that if by the timestamp of 20 we do not receive any new data, then
>> the window should be closed, i.e. the window [10, 20).
>>
>> b. When later we received (15, 3), it means that this record ** changed **
>> the window gap interval from 10 to 3, and hence we received a new record at
>> 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3) if
>> we have not received any new data, the window should be closed, i.e. the
>> window is now [10, 18) which includes two records at 10 and 15.
>>
>> c. The third record is received at 19, which is after the window close time
>> 18, it means that we should now start a new window starting at 19, i.e. the
>> window is [19, 24),
>>
>>
>> BUT, because of the out of ordering, we did not receive (15, 3) in time,
>> but received (19, 5), it will cause us to mistakenly merge the window of
>> [10, 20) with [19, 24) to [10, 24), and only when later we received (15, 3)
>> we realized that the previous window should have been ended at 18.
>>
>> Does that make sense to you?
>>
>>
>> Guozhang
>>
>>
>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <ma...@confluent.io>
>> wrote:
>>
>>> I cannot follow the example:
>>>
>>>>> (10, 10), (15, 3), (19, 5) ...
>>>
>>> First, [10,10] is created, second the window is extended to [10,15], and
>>> third [19,19] is created. Why would there be a [15,15]? And why would
>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
>>> thus [19,19] should be its own window?
>>>
>>>> Take a look at another example,
>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>
>>>> in this case when (15, 3) is received, [13,13] should be retrieved and
>>>> merged to a new window [13, 15], then [19,19] should be updated to [13,
>>>> 19]. Correct?
>>>
>>> This example makes sense. However, Guozhang's example was different. The
>>> late even, _reduces_ the gap and this can lead to a window split.
>>> Guozhang's example was
>>>
>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>
>>> First [10,10] is created, second [10,19] is create (gap is 10, so 10 and
>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
>>> [19,19] must be two windows, ie, original window [10,19] must be split.
>>>
>>>
>>> Or maybe you have different semantic about gaps are dynamically modified
>>> in mind? It's a little unclear for the KIP itself what semantics dynamic
>>> sessions windows should have.
>>>
>>>
>>> What is also unclear to me atm is, what use cases you have in mind? The
>>> KIP only says
>>>
>>>> the statistical aggregation result, liquidity of the records,
>>>
>>>
>>> I am not sure what this means. Can you elaborate?
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 8/30/18 3:32 PM, Lei Chen wrote:
>>>> Hi Guozhang,
>>>>
>>>> Thanks for reviewing the proposal. I didn't think of out of order
>> events
>>>> and glad that you brought it up.
>>>>
>>>> In the example you gave,
>>>>
>>>> (10, 10), (19, 5), (15, 3) ...
>>>>
>>>> my understanding is that the correct result window should be the same
>> as
>>> in
>>>> order events
>>>>
>>>> (10, 10), (15, 3), (19, 5) ...
>>>>
>>>> when (15, 3) is received, [15,15] is creatd
>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, 19]
>>> is
>>>> created, meanwhile [15,15] is removed
>>>>
>>>> back to out of order case,
>>>>
>>>> when (19 ,5) is received, [19, 19] is created
>>>> when (15, 3) is received, in order to generate the same result,
>>>> 1. if late event is later than retention period, it will be dropped
>>>> 2. otherwise, adjacent session windows within gap should be retrieved
>> and
>>>> merged accordingly, in this case [19, 19], and create a new session
>> [15,
>>> 19]
>>>> I'm little confused when you said "the window [15, 15] SHOULD actually
>> be
>>>> expired at 18 and hence the next record (19, 5) should be for a new
>>> session
>>>> already.". If i understand it correctly, the expiration of the window
>> is
>>>> only checked when next event (19,5) comes and then it should be merged
>> to
>>>> it. [15, 15] will then be closed. Is that also what you meant?
>>>> I cannot think of a case where a window will be split by a late event,
>>>> because if event A and C fall into the same session window, a late
>> event
>>> B
>>>> in middle will definitely fall into C's gap as well. IOW, late event
>> will
>>>> only cause window extension, not split.
>>>>
>>>> Take a look at another example,
>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>
>>>> in this case when (15, 3) is received, [13,13] should be retrieved and
>>>> merged to a new window [13, 15], then [19,19] should be updated to [13,
>>>> 19]. Correct?
>>>>
>>>> To be able to achieve that, like you said, the gap needs to be stored
>> for
>>>> sessions. We don't need to save the gap with each event, but only for
>>> each
>>>> session window. To avoid upgrading existing session window, how about
>>>> create a new Window type extended from SessionWindow along with a new
>>>> KeySchema?
>>>>
>>>> What do you think?
>>>>
>>>> Lei
>>>>
>>>>
>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>>
>>>>> Hello Lei,
>>>>>
>>>>> Thanks for the proposal. I've just made a quick pass over it and there
>>> is a
>>>>> question I have:
>>>>>
>>>>> The session windows are defined per key, i.e. does that mean that each
>>>>> incoming record of the key can dynamically change the gap of the
>> window?
>>>>> For example, say you have the following record for the same key coming
>>> in
>>>>> order, where the first time is the timestamp of the record, and the
>>> second
>>>>> value is the extracted gap value:
>>>>>
>>>>> (10, 10), (19, 5), ...
>>>>>
>>>>>
>>>>> When we receive the first record at time 10, the gap is extracted as
>> 10,
>>>>> and hence the window will be expired at 20 if no other record is
>>> received.
>>>>> When we receive the second record at time 19, the gap is modified to
>> 5,
>>> and
>>>>> hence the window will be expired at 24 if no other record is received.
>>>>>
>>>>>
>>>>> If that's the case, I'm wondering how out-of-order data can be handled
>>>>> then, consider this stream:
>>>>>
>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>
>>>>> I.e. you received a late record indicating at timestamp 15, which
>>> shorten
>>>>> the gap to 3. It means that the window SHOULD actually be expired at
>> 18,
>>>>> and hence the next record (19, 5) should be for a new session already.
>>>>> Today Streams session window implementation does not do "window
>> split",
>>> so
>>>>> have you thought about how this can be extended?
>>>>>
>>>>> Also since in your proposal each session window's gap value would be
>>>>> different, we need to store this value along with each record then,
>> how
>>>>> would we store it, and what would be the upgrade path if it is not a
>>>>> compatible change on disk storage etc?
>>>>>
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I created a KIP to add dynamic gap session window support to Kafka
>>>>> Streams
>>>>>> DSL.
>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 362%3A+Support+dynamic+gap+session+window
>>>>>>
>>>>>> Please take a look,
>>>>>>
>>>>>> Thanks,
>>>>>> Lei
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 


Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by Lei Chen <le...@gmail.com>.
Thanks Matthias and Guozhang for the response.

Seems like our understanding mainly differs in the semantics of gap in
session windows.

My understanding is that gap is used to merge nearby records together such
that no record
in the merged window has distance later than gap. In Kafka Streams's
implementation it's
mainly used to find neighbor records/windows in session store so that
nearby records can
be merge. It is NOT used to determine when a window should be closed, which
is in
fact determined by window's grace period.

Guozhang you said "b. When later we received (15, 3), it means that this
record ** changed **
the window gap interval from 10 to 3, and hence we received a new record at
15, with the new window gap of 3, it means that by timestamp 18 (15 + 3) if
we have not received any new data, the window should be closed, i.e. the
window is now [10, 18) which includes two records at 10 and 15."

This is different from what i thought will happen.

I thought when (15,3) is received, kafka streams look up for neighbor
record/window that is within the gap
of [15-3, 15+3], and merge if any. Previous record (10, 10) created its own
window [10, 10], which is
out of the gap, so nothing will be found and no merge occurs. Hence we have
two windows now in session store,
[10, 10] and [15, 15] respectively.

Also another thing worth mentioning is that, the session window object
created in current kafka streams
implementation doesn't have gap info, it has start and end, which is the
earliest and latest event timestamp
in that window interval, i.e for (10,10), the session window gets created
is [10,10], rather than [10,20]. Just to clarify
so that it's clear why (10,10) cannot be fetched when looking for gap of
(15,3), it's because the end boundary 10 of
[10,10] is smaller than search boundary [12,18].

Please correct me if my understanding is wrong here.

@Matthias, to answer your use case question, we have an use case where
asynchronous time series data
are received in the stream, from different contributors, with different
quality and at different pace.
Inside Kafka Streams, we use state to maintain statistic aggregations and
other mathematics model to track
the liquidity and calculate time decay rate and dynamic gap, so that at
runtime, for each contributor we can
1. determine how many historical records we should maintain in state.
2. for each incoming record, output a record using aggregations from
*nearby* records from that contributor.
Why fixed gap session window doesn't work here? Because the definition of
"nearby" here is determined by
several very dynamic factors in our case, it changes not only with
different hours in a day, but also related to
other contributors.
The purpose of this KIP is to suggest a dynamic session window
implementation so that we can embed such
dynamic "nearby" calculation capability into kafka streams session windows
semantics. Hope it makes sense to you.

Lei




On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Lei,
>
> As Matthias mentioned, the key question here is that because of the late
> arrivals of records which may indicate a shorter session gap interval, some
> session windows may be "mistakenly" merged and hence need to be undone the
> merge, i.e. to split them again.
>
> Back to my example, you are right that the processing result of
>
> (10, 10), (19, 5), (15, 3) ..
>
> should be the same as the processing result of
>
> (10, 10), (15, 3), (19, 5) ..
>
> Note that the second value is NOT the window end time, but the extracted
> window gap interval, as you suggested in the KIP this value can be
> dynamically changed
>
> a. If you take a look at the second ordering, when we receive (10, 10) it
> means a window starting at 10 is created, and its gap interval is 10, which
> means that if by the timestamp of 20 we do not receive any new data, then
> the window should be closed, i.e. the window [10, 20).
>
> b. When later we received (15, 3), it means that this record ** changed **
> the window gap interval from 10 to 3, and hence we received a new record at
> 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3) if
> we have not received any new data, the window should be closed, i.e. the
> window is now [10, 18) which includes two records at 10 and 15.
>
> c. The third record is received at 19, which is after the window close time
> 18, it means that we should now start a new window starting at 19, i.e. the
> window is [19, 24),
>
>
> BUT, because of the out of ordering, we did not receive (15, 3) in time,
> but received (19, 5), it will cause us to mistakenly merge the window of
> [10, 20) with [19, 24) to [10, 24), and only when later we received (15, 3)
> we realized that the previous window should have been ended at 18.
>
> Does that make sense to you?
>
>
> Guozhang
>
>
> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > I cannot follow the example:
> >
> > >> (10, 10), (15, 3), (19, 5) ...
> >
> > First, [10,10] is created, second the window is extended to [10,15], and
> > third [19,19] is created. Why would there be a [15,15]? And why would
> > (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
> > thus [19,19] should be its own window?
> >
> > > Take a look at another example,
> > > (13, 3),  (19, 5), (15, 3) ...
> > >
> > > in this case when (15, 3) is received, [13,13] should be retrieved and
> > > merged to a new window [13, 15], then [19,19] should be updated to [13,
> > > 19]. Correct?
> >
> > This example makes sense. However, Guozhang's example was different. The
> > late even, _reduces_ the gap and this can lead to a window split.
> > Guozhang's example was
> >
> > >>> (10, 10), (19, 5), (15, 3) ...
> >
> > First [10,10] is created, second [10,19] is create (gap is 10, so 10 and
> > 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
> > [19,19] must be two windows, ie, original window [10,19] must be split.
> >
> >
> > Or maybe you have different semantic about gaps are dynamically modified
> > in mind? It's a little unclear for the KIP itself what semantics dynamic
> > sessions windows should have.
> >
> >
> > What is also unclear to me atm is, what use cases you have in mind? The
> > KIP only says
> >
> > > the statistical aggregation result, liquidity of the records,
> >
> >
> > I am not sure what this means. Can you elaborate?
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 8/30/18 3:32 PM, Lei Chen wrote:
> > > Hi Guozhang,
> > >
> > > Thanks for reviewing the proposal. I didn't think of out of order
> events
> > > and glad that you brought it up.
> > >
> > > In the example you gave,
> > >
> > > (10, 10), (19, 5), (15, 3) ...
> > >
> > > my understanding is that the correct result window should be the same
> as
> > in
> > > order events
> > >
> > > (10, 10), (15, 3), (19, 5) ...
> > >
> > > when (15, 3) is received, [15,15] is creatd
> > > when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, 19]
> > is
> > > created, meanwhile [15,15] is removed
> > >
> > > back to out of order case,
> > >
> > > when (19 ,5) is received, [19, 19] is created
> > > when (15, 3) is received, in order to generate the same result,
> > > 1. if late event is later than retention period, it will be dropped
> > > 2. otherwise, adjacent session windows within gap should be retrieved
> and
> > > merged accordingly, in this case [19, 19], and create a new session
> [15,
> > 19]
> > > I'm little confused when you said "the window [15, 15] SHOULD actually
> be
> > > expired at 18 and hence the next record (19, 5) should be for a new
> > session
> > > already.". If i understand it correctly, the expiration of the window
> is
> > > only checked when next event (19,5) comes and then it should be merged
> to
> > > it. [15, 15] will then be closed. Is that also what you meant?
> > > I cannot think of a case where a window will be split by a late event,
> > > because if event A and C fall into the same session window, a late
> event
> > B
> > > in middle will definitely fall into C's gap as well. IOW, late event
> will
> > > only cause window extension, not split.
> > >
> > > Take a look at another example,
> > > (13, 3),  (19, 5), (15, 3) ...
> > >
> > > in this case when (15, 3) is received, [13,13] should be retrieved and
> > > merged to a new window [13, 15], then [19,19] should be updated to [13,
> > > 19]. Correct?
> > >
> > > To be able to achieve that, like you said, the gap needs to be stored
> for
> > > sessions. We don't need to save the gap with each event, but only for
> > each
> > > session window. To avoid upgrading existing session window, how about
> > > create a new Window type extended from SessionWindow along with a new
> > > KeySchema?
> > >
> > > What do you think?
> > >
> > > Lei
> > >
> > >
> > > On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > >> Hello Lei,
> > >>
> > >> Thanks for the proposal. I've just made a quick pass over it and there
> > is a
> > >> question I have:
> > >>
> > >> The session windows are defined per key, i.e. does that mean that each
> > >> incoming record of the key can dynamically change the gap of the
> window?
> > >> For example, say you have the following record for the same key coming
> > in
> > >> order, where the first time is the timestamp of the record, and the
> > second
> > >> value is the extracted gap value:
> > >>
> > >> (10, 10), (19, 5), ...
> > >>
> > >>
> > >> When we receive the first record at time 10, the gap is extracted as
> 10,
> > >> and hence the window will be expired at 20 if no other record is
> > received.
> > >> When we receive the second record at time 19, the gap is modified to
> 5,
> > and
> > >> hence the window will be expired at 24 if no other record is received.
> > >>
> > >>
> > >> If that's the case, I'm wondering how out-of-order data can be handled
> > >> then, consider this stream:
> > >>
> > >> (10, 10), (19, 5), (15, 3) ...
> > >>
> > >> I.e. you received a late record indicating at timestamp 15, which
> > shorten
> > >> the gap to 3. It means that the window SHOULD actually be expired at
> 18,
> > >> and hence the next record (19, 5) should be for a new session already.
> > >> Today Streams session window implementation does not do "window
> split",
> > so
> > >> have you thought about how this can be extended?
> > >>
> > >> Also since in your proposal each session window's gap value would be
> > >> different, we need to store this value along with each record then,
> how
> > >> would we store it, and what would be the upgrade path if it is not a
> > >> compatible change on disk storage etc?
> > >>
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >>
> > >> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com> wrote:
> > >>
> > >>> Hi All,
> > >>>
> > >>> I created a KIP to add dynamic gap session window support to Kafka
> > >> Streams
> > >>> DSL.
> > >>>
> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>> 362%3A+Support+dynamic+gap+session+window
> > >>>
> > >>> Please take a look,
> > >>>
> > >>> Thanks,
> > >>> Lei
> > >>>
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> >
> >
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-362: Dynamic Session Window Support

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Lei,

As Matthias mentioned, the key question here is that because of the late
arrivals of records which may indicate a shorter session gap interval, some
session windows may be "mistakenly" merged and hence need to be undone the
merge, i.e. to split them again.

Back to my example, you are right that the processing result of

(10, 10), (19, 5), (15, 3) ..

should be the same as the processing result of

(10, 10), (15, 3), (19, 5) ..

Note that the second value is NOT the window end time, but the extracted
window gap interval, as you suggested in the KIP this value can be
dynamically changed

a. If you take a look at the second ordering, when we receive (10, 10) it
means a window starting at 10 is created, and its gap interval is 10, which
means that if by the timestamp of 20 we do not receive any new data, then
the window should be closed, i.e. the window [10, 20).

b. When later we received (15, 3), it means that this record ** changed **
the window gap interval from 10 to 3, and hence we received a new record at
15, with the new window gap of 3, it means that by timestamp 18 (15 + 3) if
we have not received any new data, the window should be closed, i.e. the
window is now [10, 18) which includes two records at 10 and 15.

c. The third record is received at 19, which is after the window close time
18, it means that we should now start a new window starting at 19, i.e. the
window is [19, 24),


BUT, because of the out of ordering, we did not receive (15, 3) in time,
but received (19, 5), it will cause us to mistakenly merge the window of
[10, 20) with [19, 24) to [10, 24), and only when later we received (15, 3)
we realized that the previous window should have been ended at 18.

Does that make sense to you?


Guozhang


On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I cannot follow the example:
>
> >> (10, 10), (15, 3), (19, 5) ...
>
> First, [10,10] is created, second the window is extended to [10,15], and
> third [19,19] is created. Why would there be a [15,15]? And why would
> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
> thus [19,19] should be its own window?
>
> > Take a look at another example,
> > (13, 3),  (19, 5), (15, 3) ...
> >
> > in this case when (15, 3) is received, [13,13] should be retrieved and
> > merged to a new window [13, 15], then [19,19] should be updated to [13,
> > 19]. Correct?
>
> This example makes sense. However, Guozhang's example was different. The
> late even, _reduces_ the gap and this can lead to a window split.
> Guozhang's example was
>
> >>> (10, 10), (19, 5), (15, 3) ...
>
> First [10,10] is created, second [10,19] is create (gap is 10, so 10 and
> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
> [19,19] must be two windows, ie, original window [10,19] must be split.
>
>
> Or maybe you have different semantic about gaps are dynamically modified
> in mind? It's a little unclear for the KIP itself what semantics dynamic
> sessions windows should have.
>
>
> What is also unclear to me atm is, what use cases you have in mind? The
> KIP only says
>
> > the statistical aggregation result, liquidity of the records,
>
>
> I am not sure what this means. Can you elaborate?
>
>
>
> -Matthias
>
>
>
> On 8/30/18 3:32 PM, Lei Chen wrote:
> > Hi Guozhang,
> >
> > Thanks for reviewing the proposal. I didn't think of out of order events
> > and glad that you brought it up.
> >
> > In the example you gave,
> >
> > (10, 10), (19, 5), (15, 3) ...
> >
> > my understanding is that the correct result window should be the same as
> in
> > order events
> >
> > (10, 10), (15, 3), (19, 5) ...
> >
> > when (15, 3) is received, [15,15] is creatd
> > when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, 19]
> is
> > created, meanwhile [15,15] is removed
> >
> > back to out of order case,
> >
> > when (19 ,5) is received, [19, 19] is created
> > when (15, 3) is received, in order to generate the same result,
> > 1. if late event is later than retention period, it will be dropped
> > 2. otherwise, adjacent session windows within gap should be retrieved and
> > merged accordingly, in this case [19, 19], and create a new session [15,
> 19]
> > I'm little confused when you said "the window [15, 15] SHOULD actually be
> > expired at 18 and hence the next record (19, 5) should be for a new
> session
> > already.". If i understand it correctly, the expiration of the window is
> > only checked when next event (19,5) comes and then it should be merged to
> > it. [15, 15] will then be closed. Is that also what you meant?
> > I cannot think of a case where a window will be split by a late event,
> > because if event A and C fall into the same session window, a late event
> B
> > in middle will definitely fall into C's gap as well. IOW, late event will
> > only cause window extension, not split.
> >
> > Take a look at another example,
> > (13, 3),  (19, 5), (15, 3) ...
> >
> > in this case when (15, 3) is received, [13,13] should be retrieved and
> > merged to a new window [13, 15], then [19,19] should be updated to [13,
> > 19]. Correct?
> >
> > To be able to achieve that, like you said, the gap needs to be stored for
> > sessions. We don't need to save the gap with each event, but only for
> each
> > session window. To avoid upgrading existing session window, how about
> > create a new Window type extended from SessionWindow along with a new
> > KeySchema?
> >
> > What do you think?
> >
> > Lei
> >
> >
> > On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Hello Lei,
> >>
> >> Thanks for the proposal. I've just made a quick pass over it and there
> is a
> >> question I have:
> >>
> >> The session windows are defined per key, i.e. does that mean that each
> >> incoming record of the key can dynamically change the gap of the window?
> >> For example, say you have the following record for the same key coming
> in
> >> order, where the first time is the timestamp of the record, and the
> second
> >> value is the extracted gap value:
> >>
> >> (10, 10), (19, 5), ...
> >>
> >>
> >> When we receive the first record at time 10, the gap is extracted as 10,
> >> and hence the window will be expired at 20 if no other record is
> received.
> >> When we receive the second record at time 19, the gap is modified to 5,
> and
> >> hence the window will be expired at 24 if no other record is received.
> >>
> >>
> >> If that's the case, I'm wondering how out-of-order data can be handled
> >> then, consider this stream:
> >>
> >> (10, 10), (19, 5), (15, 3) ...
> >>
> >> I.e. you received a late record indicating at timestamp 15, which
> shorten
> >> the gap to 3. It means that the window SHOULD actually be expired at 18,
> >> and hence the next record (19, 5) should be for a new session already.
> >> Today Streams session window implementation does not do "window split",
> so
> >> have you thought about how this can be extended?
> >>
> >> Also since in your proposal each session window's gap value would be
> >> different, we need to store this value along with each record then, how
> >> would we store it, and what would be the upgrade path if it is not a
> >> compatible change on disk storage etc?
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <le...@gmail.com> wrote:
> >>
> >>> Hi All,
> >>>
> >>> I created a KIP to add dynamic gap session window support to Kafka
> >> Streams
> >>> DSL.
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>> 362%3A+Support+dynamic+gap+session+window
> >>>
> >>> Please take a look,
> >>>
> >>> Thanks,
> >>> Lei
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>


-- 
-- Guozhang