You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Alexander Demidko <al...@stitchfix.com> on 2017/01/05 01:16:17 UTC

Kafka Streams window retention period question

Hi folks,

I'm experimenting with Kafka Streams windowed aggregation and came across
window retention period behavior I don't fully understand.
I'm using custom timestamp extractor which gets the timestamp from the
payload. Values are aggregated using tumbling time windows and summed by
the key.
I am using kafka and kafka-streams with 0.10.1.1 version.

Full code can be found at
https://gist.github.com/xdralex/845bcf8f06ab0cfcf9785d9f95450b88, but in
general I'm doing the following:

val input: KStream[String, String] = builder.stream(Serdes.String(),
Serdes.String(), "TimeInputTopic")
val window: Windows[TimeWindow] =
TimeWindows.of(60000).advanceBy(60000).until(30000)

val aggregated: KTable[Windowed[String], JInt] = input
  .mapValues((v: String) => parse(v)._2)
  .groupByKey(Serdes.String(), Serdes.Integer())
  .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
"TimeStore1")

aggregated.foreach {
  (w: Windowed[String], s: JInt) =>
    val start = new DateTime(w.window().start(), DateTimeZone.UTC)
    val end = new DateTime(w.window().end(), DateTimeZone.UTC)
    println(s"Aggregated: $start..$end - ${w.key()} - $s")
}

Here is the data being sent to TimeInputTopic:
a,1970-01-01T00:00:00Z 1
b,1970-01-01T00:00:01Z 2
a,1970-01-01T00:00:02Z 3
b,1970-01-01T00:05:01Z 10
b,1970-01-01T00:00:15Z 10

Here is the output:
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12

Here is what confuses me.
I would expect that once an event <b,05:01 10> is received, it should
update some sort of "current" time value (watermark?), and, because 05:01 is
bigger than 00:00..01:00 + 30 seconds of retention, either:

A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
received, recreate this bucket. This means there would be two outputs for
00:00..01:00:
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
Not sure about this behavior because
http://docs.confluent.io/3.1.1/streams/developer-guide.html#windowing-a-stream
is saying: "Kafka Streams guarantees to keep a window for *at least this
specified time*". So I guess the window can be kept longer...


B) Just drop the incoming event <b,00:15 10> altogether. In this case there
would be only one output for 00:00..01:00:
Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
I could expect this because
http://docs.confluent.io/3.1.1/streams/concepts.html#windowing is saying:
"If a record arrives after the retention period has passed, the record
cannot be processed and is dropped".


Hope all this makes sense. Few questions:

– If the window can be kept in store longer, are there any thresholds when
it will finally be purged? For example, it would be nice to flush old
buckets if they are taking too much space.

– How is "current" time value updated / how do Kafka Streams decide that
the retention period has passed? Does it maintain a watermark with the
biggest time seen?

– What is the right mailing list to ask questions about Kafka Streams? I
had a choice between this one and Confluent Platform list, and given that
open source part of CP consists from patched vanilla Kafka, was not sure
where to write.

Thanks,
Alex

Re: Kafka Streams window retention period question

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi Alexander,

first, both mailing list should be fine :)

About internal time tracking: Kafka Streams tracks an internal "stream
time" that is determined as the minimum "partition time" over all its
input partitions.

The "partition time" is tracked for each input partition individually
and is the minimum timestamp of all currently buffered record of the
partition.

So depending on how many records from which partitions are fetch on
poll() "stream time" gets advanced accordingly -- this is kinda
non-deterministic because we cannot predict what poll() will return.

Because all buffered records are considered, "stream time" is advance
conservatively. The main idea about this is to keep windows open longer
if we know in advance that there will be a late record (as we observe
the late record in the buffer already). Thus, we can compute "more
correct" results with regard to late arriving records.
(Just a heads up, we might change this behavior in future releases --
thus, it is not documented anywhere but in the code ;) )



About retention time and purging windows: old windows should be dropped
after "stream time" advances beyond the point on which it is guaranteed
to maintain the window. It should happen "as soon as possible" but there
is no strict guarantee (thus "kept at least").

Furthermore, Streams applies a minimum retention time of 1 minute --
thus, for your specific use case, the 30 seconds you do specify are not
used (this is unfortunately not documented :( ). However, this in
unrelated to the behavior you see -- I just mention it for completeness.


Thus for you specific use case, streams time is most likely not advance
to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
most likely in the buffer) and thus, the next b-record with TS=15
seconds will be added the the still open (first) b-window and both
values 2 and 10 get added to 12.


Also keep in mind that we do some deduplication on KTable result using
an internal cache. This can also influence what output record you see.
For further details see:
http://docs.confluent.io/current/streams/developer-guide.html#memory-management



-Matthias

On 1/4/17 5:16 PM, Alexander Demidko wrote:
> Hi folks,
> 
> I'm experimenting with Kafka Streams windowed aggregation and came across
> window retention period behavior I don't fully understand.
> I'm using custom timestamp extractor which gets the timestamp from the
> payload. Values are aggregated using tumbling time windows and summed by
> the key.
> I am using kafka and kafka-streams with 0.10.1.1 version.
> 
> Full code can be found at
> https://gist.github.com/xdralex/845bcf8f06ab0cfcf9785d9f95450b88, but in
> general I'm doing the following:
> 
> val input: KStream[String, String] = builder.stream(Serdes.String(),
> Serdes.String(), "TimeInputTopic")
> val window: Windows[TimeWindow] =
> TimeWindows.of(60000).advanceBy(60000).until(30000)
> 
> val aggregated: KTable[Windowed[String], JInt] = input
>   .mapValues((v: String) => parse(v)._2)
>   .groupByKey(Serdes.String(), Serdes.Integer())
>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> "TimeStore1")
> 
> aggregated.foreach {
>   (w: Windowed[String], s: JInt) =>
>     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
>     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
>     println(s"Aggregated: $start..$end - ${w.key()} - $s")
> }
> 
> Here is the data being sent to TimeInputTopic:
> a,1970-01-01T00:00:00Z 1
> b,1970-01-01T00:00:01Z 2
> a,1970-01-01T00:00:02Z 3
> b,1970-01-01T00:05:01Z 10
> b,1970-01-01T00:00:15Z 10
> 
> Here is the output:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
> 
> Here is what confuses me.
> I would expect that once an event <b,05:01 10> is received, it should
> update some sort of "current" time value (watermark?), and, because 05:01 is
> bigger than 00:00..01:00 + 30 seconds of retention, either:
> 
> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
> received, recreate this bucket. This means there would be two outputs for
> 00:00..01:00:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
> Not sure about this behavior because
> http://docs.confluent.io/3.1.1/streams/developer-guide.html#windowing-a-stream
> is saying: "Kafka Streams guarantees to keep a window for *at least this
> specified time*". So I guess the window can be kept longer...
> 
> 
> B) Just drop the incoming event <b,00:15 10> altogether. In this case there
> would be only one output for 00:00..01:00:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> I could expect this because
> http://docs.confluent.io/3.1.1/streams/concepts.html#windowing is saying:
> "If a record arrives after the retention period has passed, the record
> cannot be processed and is dropped".
> 
> 
> Hope all this makes sense. Few questions:
> 
> – If the window can be kept in store longer, are there any thresholds when
> it will finally be purged? For example, it would be nice to flush old
> buckets if they are taking too much space.
> 
> – How is "current" time value updated / how do Kafka Streams decide that
> the retention period has passed? Does it maintain a watermark with the
> biggest time seen?
> 
> – What is the right mailing list to ask questions about Kafka Streams? I
> had a choice between this one and Confluent Platform list, and given that
> open source part of CP consists from patched vanilla Kafka, was not sure
> where to write.
> 
> Thanks,
> Alex
> 


Re: Kafka Streams window retention period question

Posted by Alexander Demidko <al...@stitchfix.com>.
Great, thanks for the info guys!

On Thu, Jan 5, 2017 at 10:09 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi Alex,
>
> if a window was purged because its retention time passed it will not
> accept any records anymore -- thus, if a very late record arrives, it
> will get dropped without any further notice.
>
> About stream time and partition: yes. And how time is advanced/tracked
> in independent for the window type.
>
>
> -Matthias
>
> On 1/5/17 9:14 PM, Alexander Demidko wrote:
> > Hi Matthias,
> >
> > Thanks for such a thorough response!
> >
> > I guess there are cases when a determinism might be preferred over
> > computing "more correct results" (e.g. in unit tests, where one manually
> > lays out an order of incoming events and wants to get an exact output),
> but
> > from now on I can simply assume that windows might be stored longer than
> > the specified time.
> >
> > Few more questions if you don't mind.
> >
> > - What should happen when the window 00:00..01:00 will finally get purged
> > (and the internal stream time will get bumped to say time 10:00) but
> then I
> > receive an event <b,00:15 10>? Will it create the 00:00..01:00 window
> again
> > or the event will be dropped because it's way older than the internal
> > stream time?
> >
> > - I got a bit confused when you mentioned a key in the window name "open
> > (first) >>>b<<<-window". To make it clear – I assume that because in
> Kafka
> > Streams hopping/tumbling windows are aligned, an internal stream time is
> > not related to the aggregation keys but just to the input partitions,
> > right? I.e. if I have only one partition there will be only one internal
> > stream time watermark regardless of how many keys do I have? Will this
> > behavior be the same for sliding windows? Feel free to just point me to
> the
> > code :)
> >
> > Alex
> >
> >
> >> Hi Alexander,
> >>
> >> first, both mailing list should be fine :)
> >>
> >> About internal time tracking: Kafka Streams tracks an internal "stream
> >> time" that is determined as the minimum "partition time" over all its
> >> input partitions.
> >>
> >> The "partition time" is tracked for each input partition individually
> >> and is the minimum timestamp of all currently buffered record of the
> >> partition.
> >>
> >> So depending on how many records from which partitions are fetch on
> >> poll() "stream time" gets advanced accordingly -- this is kinda
> >> non-deterministic because we cannot predict what poll() will return.
> >>
> >> Because all buffered records are considered, "stream time" is advance
> >> conservatively. The main idea about this is to keep windows open longer
> >> if we know in advance that there will be a late record (as we observe
> >> the late record in the buffer already). Thus, we can compute "more
> >> correct" results with regard to late arriving records.
> >> (Just a heads up, we might change this behavior in future releases --
> >> thus, it is not documented anywhere but in the code ;) )
> >>
> >>
> >>
> >> About retention time and purging windows: old windows should be dropped
> >> after "stream time" advances beyond the point on which it is guaranteed
> >> to maintain the window. It should happen "as soon as possible" but there
> >> is no strict guarantee (thus "kept at least").
> >>
> >> Furthermore, Streams applies a minimum retention time of 1 minute --
> >> thus, for your specific use case, the 30 seconds you do specify are not
> >> used (this is unfortunately not documented :( ). However, this in
> >> unrelated to the behavior you see -- I just mention it for completeness.
> >>
> >>
> >> Thus for you specific use case, streams time is most likely not advance
> >> to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
> >> most likely in the buffer) and thus, the next b-record with TS=15
> >> seconds will be added the the still open (first) b-window and both
> >> values 2 and 10 get added to 12.
> >>
> >>
> >> Also keep in mind that we do some deduplication on KTable result using
> >> an internal cache. This can also influence what output record you see.
> >> For further details see:
> >>
> > http://docs.confluent.io/current/streams/developer-
> guide.html#memory-management
> >
> >
> >> -Matthias
> >
> >
> > On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> > alexander.demidko@stitchfix.com> wrote:
> >
> >> Hi folks,
> >>
> >> I'm experimenting with Kafka Streams windowed aggregation and came
> across
> >> window retention period behavior I don't fully understand.
> >> I'm using custom timestamp extractor which gets the timestamp from the
> >> payload. Values are aggregated using tumbling time windows and summed by
> >> the key.
> >> I am using kafka and kafka-streams with 0.10.1.1 version.
> >>
> >> Full code can be found at https://gist.github.com/xdralex/
> >> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the
> following:
> >>
> >> val input: KStream[String, String] = builder.stream(Serdes.String(),
> >> Serdes.String(), "TimeInputTopic")
> >> val window: Windows[TimeWindow] = TimeWindows.of(60000).
> >> advanceBy(60000).until(30000)
> >>
> >> val aggregated: KTable[Windowed[String], JInt] = input
> >>   .mapValues((v: String) => parse(v)._2)
> >>   .groupByKey(Serdes.String(), Serdes.Integer())
> >>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> >> "TimeStore1")
> >>
> >> aggregated.foreach {
> >>   (w: Windowed[String], s: JInt) =>
> >>     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
> >>     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
> >>     println(s"Aggregated: $start..$end - ${w.key()} - $s")
> >> }
> >>
> >> Here is the data being sent to TimeInputTopic:
> >> a,1970-01-01T00:00:00Z 1
> >> b,1970-01-01T00:00:01Z 2
> >> a,1970-01-01T00:00:02Z 3
> >> b,1970-01-01T00:05:01Z 10
> >> b,1970-01-01T00:00:15Z 10
> >>
> >> Here is the output:
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
> >> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
> >>
> >> Here is what confuses me.
> >> I would expect that once an event <b,05:01 10> is received, it should
> >> update some sort of "current" time value (watermark?), and, because
> 05:01 is
> >> bigger than 00:00..01:00 + 30 seconds of retention, either:
> >>
> >> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
> >> received, recreate this bucket. This means there would be two outputs
> for
> >> 00:00..01:00:
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
> >> Not sure about this behavior because http://docs.confluent.
> >> io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying:
> >> "Kafka Streams guarantees to keep a window for *at least this specified
> >> time*". So I guess the window can be kept longer...
> >>
> >>
> >> B) Just drop the incoming event <b,00:15 10> altogether. In this case
> >> there would be only one output for 00:00..01:00:
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> >> I could expect this because http://docs.confluent.
> >> io/3.1.1/streams/concepts.html#windowing is saying: "If a record
> arrives
> >> after the retention period has passed, the record cannot be processed
> and
> >> is dropped".
> >>
> >>
> >> Hope all this makes sense. Few questions:
> >>
> >> – If the window can be kept in store longer, are there any thresholds
> when
> >> it will finally be purged? For example, it would be nice to flush old
> >> buckets if they are taking too much space.
> >>
> >> – How is "current" time value updated / how do Kafka Streams decide that
> >> the retention period has passed? Does it maintain a watermark with the
> >> biggest time seen?
> >>
> >> – What is the right mailing list to ask questions about Kafka Streams? I
> >> had a choice between this one and Confluent Platform list, and given
> that
> >> open source part of CP consists from patched vanilla Kafka, was not sure
> >> where to write.
> >>
> >> Thanks,
> >> Alex
> >>
> >>
> >>
> >>
> >>
> >>
> >
>
>

Re: Kafka Streams window retention period question

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi Alex,

if a window was purged because its retention time passed it will not
accept any records anymore -- thus, if a very late record arrives, it
will get dropped without any further notice.

About stream time and partition: yes. And how time is advanced/tracked
in independent for the window type.


-Matthias

On 1/5/17 9:14 PM, Alexander Demidko wrote:
> Hi Matthias,
> 
> Thanks for such a thorough response!
> 
> I guess there are cases when a determinism might be preferred over
> computing "more correct results" (e.g. in unit tests, where one manually
> lays out an order of incoming events and wants to get an exact output), but
> from now on I can simply assume that windows might be stored longer than
> the specified time.
> 
> Few more questions if you don't mind.
> 
> - What should happen when the window 00:00..01:00 will finally get purged
> (and the internal stream time will get bumped to say time 10:00) but then I
> receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again
> or the event will be dropped because it's way older than the internal
> stream time?
> 
> - I got a bit confused when you mentioned a key in the window name "open
> (first) >>>b<<<-window". To make it clear – I assume that because in Kafka
> Streams hopping/tumbling windows are aligned, an internal stream time is
> not related to the aggregation keys but just to the input partitions,
> right? I.e. if I have only one partition there will be only one internal
> stream time watermark regardless of how many keys do I have? Will this
> behavior be the same for sliding windows? Feel free to just point me to the
> code :)
> 
> Alex
> 
> 
>> Hi Alexander,
>>
>> first, both mailing list should be fine :)
>>
>> About internal time tracking: Kafka Streams tracks an internal "stream
>> time" that is determined as the minimum "partition time" over all its
>> input partitions.
>>
>> The "partition time" is tracked for each input partition individually
>> and is the minimum timestamp of all currently buffered record of the
>> partition.
>>
>> So depending on how many records from which partitions are fetch on
>> poll() "stream time" gets advanced accordingly -- this is kinda
>> non-deterministic because we cannot predict what poll() will return.
>>
>> Because all buffered records are considered, "stream time" is advance
>> conservatively. The main idea about this is to keep windows open longer
>> if we know in advance that there will be a late record (as we observe
>> the late record in the buffer already). Thus, we can compute "more
>> correct" results with regard to late arriving records.
>> (Just a heads up, we might change this behavior in future releases --
>> thus, it is not documented anywhere but in the code ;) )
>>
>>
>>
>> About retention time and purging windows: old windows should be dropped
>> after "stream time" advances beyond the point on which it is guaranteed
>> to maintain the window. It should happen "as soon as possible" but there
>> is no strict guarantee (thus "kept at least").
>>
>> Furthermore, Streams applies a minimum retention time of 1 minute --
>> thus, for your specific use case, the 30 seconds you do specify are not
>> used (this is unfortunately not documented :( ). However, this in
>> unrelated to the behavior you see -- I just mention it for completeness.
>>
>>
>> Thus for you specific use case, streams time is most likely not advance
>> to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
>> most likely in the buffer) and thus, the next b-record with TS=15
>> seconds will be added the the still open (first) b-window and both
>> values 2 and 10 get added to 12.
>>
>>
>> Also keep in mind that we do some deduplication on KTable result using
>> an internal cache. This can also influence what output record you see.
>> For further details see:
>>
> http://docs.confluent.io/current/streams/developer-guide.html#memory-management
> 
> 
>> -Matthias
> 
> 
> On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> alexander.demidko@stitchfix.com> wrote:
> 
>> Hi folks,
>>
>> I'm experimenting with Kafka Streams windowed aggregation and came across
>> window retention period behavior I don't fully understand.
>> I'm using custom timestamp extractor which gets the timestamp from the
>> payload. Values are aggregated using tumbling time windows and summed by
>> the key.
>> I am using kafka and kafka-streams with 0.10.1.1 version.
>>
>> Full code can be found at https://gist.github.com/xdralex/
>> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the following:
>>
>> val input: KStream[String, String] = builder.stream(Serdes.String(),
>> Serdes.String(), "TimeInputTopic")
>> val window: Windows[TimeWindow] = TimeWindows.of(60000).
>> advanceBy(60000).until(30000)
>>
>> val aggregated: KTable[Windowed[String], JInt] = input
>>   .mapValues((v: String) => parse(v)._2)
>>   .groupByKey(Serdes.String(), Serdes.Integer())
>>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
>> "TimeStore1")
>>
>> aggregated.foreach {
>>   (w: Windowed[String], s: JInt) =>
>>     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
>>     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
>>     println(s"Aggregated: $start..$end - ${w.key()} - $s")
>> }
>>
>> Here is the data being sent to TimeInputTopic:
>> a,1970-01-01T00:00:00Z 1
>> b,1970-01-01T00:00:01Z 2
>> a,1970-01-01T00:00:02Z 3
>> b,1970-01-01T00:05:01Z 10
>> b,1970-01-01T00:00:15Z 10
>>
>> Here is the output:
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
>> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
>>
>> Here is what confuses me.
>> I would expect that once an event <b,05:01 10> is received, it should
>> update some sort of "current" time value (watermark?), and, because 05:01 is
>> bigger than 00:00..01:00 + 30 seconds of retention, either:
>>
>> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
>> received, recreate this bucket. This means there would be two outputs for
>> 00:00..01:00:
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
>> Not sure about this behavior because http://docs.confluent.
>> io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying:
>> "Kafka Streams guarantees to keep a window for *at least this specified
>> time*". So I guess the window can be kept longer...
>>
>>
>> B) Just drop the incoming event <b,00:15 10> altogether. In this case
>> there would be only one output for 00:00..01:00:
>> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
>> I could expect this because http://docs.confluent.
>> io/3.1.1/streams/concepts.html#windowing is saying: "If a record arrives
>> after the retention period has passed, the record cannot be processed and
>> is dropped".
>>
>>
>> Hope all this makes sense. Few questions:
>>
>> – If the window can be kept in store longer, are there any thresholds when
>> it will finally be purged? For example, it would be nice to flush old
>> buckets if they are taking too much space.
>>
>> – How is "current" time value updated / how do Kafka Streams decide that
>> the retention period has passed? Does it maintain a watermark with the
>> biggest time seen?
>>
>> – What is the right mailing list to ask questions about Kafka Streams? I
>> had a choice between this one and Confluent Platform list, and given that
>> open source part of CP consists from patched vanilla Kafka, was not sure
>> where to write.
>>
>> Thanks,
>> Alex
>>
>>
>>
>>
>>
>>
> 


Re: Kafka Streams window retention period question

Posted by Sachin Mittal <sj...@gmail.com>.
What should happen when the window 00:00..01:00 will finally get purged
(and the internal stream time will get bumped to say time 10:00) but then I
receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again
or the event will be dropped because it's way older than the internal
stream time?

Once window is finally purged and you get a late message for that window
again, it will create that window, aggregate single value to it and again
drop that window.
So this will be of not much use. Suggestion here is to keep the window
until sufficient time so that you don't get any late message.
Another thing I have found is that if until is long enough compaction (and
deletion in later release), will trigger only after until, and your
changelog topics will be huge. This may result in high local state restore
time if streams are resumed. Also in general I see more lag in case until
is set way high.

Thanks
Sachin



On Fri, Jan 6, 2017 at 10:44 AM, Alexander Demidko <
alexander.demidko@stitchfix.com> wrote:

> Hi Matthias,
>
> Thanks for such a thorough response!
>
> I guess there are cases when a determinism might be preferred over
> computing "more correct results" (e.g. in unit tests, where one manually
> lays out an order of incoming events and wants to get an exact output), but
> from now on I can simply assume that windows might be stored longer than
> the specified time.
>
> Few more questions if you don't mind.
>
> - What should happen when the window 00:00..01:00 will finally get purged
> (and the internal stream time will get bumped to say time 10:00) but then I
> receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again
> or the event will be dropped because it's way older than the internal
> stream time?
>
> - I got a bit confused when you mentioned a key in the window name "open
> (first) >>>b<<<-window". To make it clear – I assume that because in Kafka
> Streams hopping/tumbling windows are aligned, an internal stream time is
> not related to the aggregation keys but just to the input partitions,
> right? I.e. if I have only one partition there will be only one internal
> stream time watermark regardless of how many keys do I have? Will this
> behavior be the same for sliding windows? Feel free to just point me to the
> code :)
>
> Alex
>
>
> > Hi Alexander,
> >
> > first, both mailing list should be fine :)
> >
> > About internal time tracking: Kafka Streams tracks an internal "stream
> > time" that is determined as the minimum "partition time" over all its
> > input partitions.
> >
> > The "partition time" is tracked for each input partition individually
> > and is the minimum timestamp of all currently buffered record of the
> > partition.
> >
> > So depending on how many records from which partitions are fetch on
> > poll() "stream time" gets advanced accordingly -- this is kinda
> > non-deterministic because we cannot predict what poll() will return.
> >
> > Because all buffered records are considered, "stream time" is advance
> > conservatively. The main idea about this is to keep windows open longer
> > if we know in advance that there will be a late record (as we observe
> > the late record in the buffer already). Thus, we can compute "more
> > correct" results with regard to late arriving records.
> > (Just a heads up, we might change this behavior in future releases --
> > thus, it is not documented anywhere but in the code ;) )
> >
> >
> >
> > About retention time and purging windows: old windows should be dropped
> > after "stream time" advances beyond the point on which it is guaranteed
> > to maintain the window. It should happen "as soon as possible" but there
> > is no strict guarantee (thus "kept at least").
> >
> > Furthermore, Streams applies a minimum retention time of 1 minute --
> > thus, for your specific use case, the 30 seconds you do specify are not
> > used (this is unfortunately not documented :( ). However, this in
> > unrelated to the behavior you see -- I just mention it for completeness.
> >
> >
> > Thus for you specific use case, streams time is most likely not advance
> > to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
> > most likely in the buffer) and thus, the next b-record with TS=15
> > seconds will be added the the still open (first) b-window and both
> > values 2 and 10 get added to 12.
> >
> >
> > Also keep in mind that we do some deduplication on KTable result using
> > an internal cache. This can also influence what output record you see.
> > For further details see:
> >
> http://docs.confluent.io/current/streams/developer-
> guide.html#memory-management
>
>
> > -Matthias
>
>
> On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> alexander.demidko@stitchfix.com> wrote:
>
> > Hi folks,
> >
> > I'm experimenting with Kafka Streams windowed aggregation and came across
> > window retention period behavior I don't fully understand.
> > I'm using custom timestamp extractor which gets the timestamp from the
> > payload. Values are aggregated using tumbling time windows and summed by
> > the key.
> > I am using kafka and kafka-streams with 0.10.1.1 version.
> >
> > Full code can be found at https://gist.github.com/xdralex/
> > 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the
> following:
> >
> > val input: KStream[String, String] = builder.stream(Serdes.String(),
> > Serdes.String(), "TimeInputTopic")
> > val window: Windows[TimeWindow] = TimeWindows.of(60000).
> > advanceBy(60000).until(30000)
> >
> > val aggregated: KTable[Windowed[String], JInt] = input
> >   .mapValues((v: String) => parse(v)._2)
> >   .groupByKey(Serdes.String(), Serdes.Integer())
> >   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> > "TimeStore1")
> >
> > aggregated.foreach {
> >   (w: Windowed[String], s: JInt) =>
> >     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
> >     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
> >     println(s"Aggregated: $start..$end - ${w.key()} - $s")
> > }
> >
> > Here is the data being sent to TimeInputTopic:
> > a,1970-01-01T00:00:00Z 1
> > b,1970-01-01T00:00:01Z 2
> > a,1970-01-01T00:00:02Z 3
> > b,1970-01-01T00:05:01Z 10
> > b,1970-01-01T00:00:15Z 10
> >
> > Here is the output:
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
> > Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
> >
> > Here is what confuses me.
> > I would expect that once an event <b,05:01 10> is received, it should
> > update some sort of "current" time value (watermark?), and, because
> 05:01 is
> > bigger than 00:00..01:00 + 30 seconds of retention, either:
> >
> > A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
> > received, recreate this bucket. This means there would be two outputs for
> > 00:00..01:00:
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
> > Not sure about this behavior because http://docs.confluent.
> > io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying:
> > "Kafka Streams guarantees to keep a window for *at least this specified
> > time*". So I guess the window can be kept longer...
> >
> >
> > B) Just drop the incoming event <b,00:15 10> altogether. In this case
> > there would be only one output for 00:00..01:00:
> > Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> > I could expect this because http://docs.confluent.
> > io/3.1.1/streams/concepts.html#windowing is saying: "If a record arrives
> > after the retention period has passed, the record cannot be processed and
> > is dropped".
> >
> >
> > Hope all this makes sense. Few questions:
> >
> > – If the window can be kept in store longer, are there any thresholds
> when
> > it will finally be purged? For example, it would be nice to flush old
> > buckets if they are taking too much space.
> >
> > – How is "current" time value updated / how do Kafka Streams decide that
> > the retention period has passed? Does it maintain a watermark with the
> > biggest time seen?
> >
> > – What is the right mailing list to ask questions about Kafka Streams? I
> > had a choice between this one and Confluent Platform list, and given that
> > open source part of CP consists from patched vanilla Kafka, was not sure
> > where to write.
> >
> > Thanks,
> > Alex
> >
> >
> >
> >
> >
> >
>

Re: Kafka Streams window retention period question

Posted by Alexander Demidko <al...@stitchfix.com>.
Hi Matthias,

Thanks for such a thorough response!

I guess there are cases when a determinism might be preferred over
computing "more correct results" (e.g. in unit tests, where one manually
lays out an order of incoming events and wants to get an exact output), but
from now on I can simply assume that windows might be stored longer than
the specified time.

Few more questions if you don't mind.

- What should happen when the window 00:00..01:00 will finally get purged
(and the internal stream time will get bumped to say time 10:00) but then I
receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again
or the event will be dropped because it's way older than the internal
stream time?

- I got a bit confused when you mentioned a key in the window name "open
(first) >>>b<<<-window". To make it clear – I assume that because in Kafka
Streams hopping/tumbling windows are aligned, an internal stream time is
not related to the aggregation keys but just to the input partitions,
right? I.e. if I have only one partition there will be only one internal
stream time watermark regardless of how many keys do I have? Will this
behavior be the same for sliding windows? Feel free to just point me to the
code :)

Alex


> Hi Alexander,
>
> first, both mailing list should be fine :)
>
> About internal time tracking: Kafka Streams tracks an internal "stream
> time" that is determined as the minimum "partition time" over all its
> input partitions.
>
> The "partition time" is tracked for each input partition individually
> and is the minimum timestamp of all currently buffered record of the
> partition.
>
> So depending on how many records from which partitions are fetch on
> poll() "stream time" gets advanced accordingly -- this is kinda
> non-deterministic because we cannot predict what poll() will return.
>
> Because all buffered records are considered, "stream time" is advance
> conservatively. The main idea about this is to keep windows open longer
> if we know in advance that there will be a late record (as we observe
> the late record in the buffer already). Thus, we can compute "more
> correct" results with regard to late arriving records.
> (Just a heads up, we might change this behavior in future releases --
> thus, it is not documented anywhere but in the code ;) )
>
>
>
> About retention time and purging windows: old windows should be dropped
> after "stream time" advances beyond the point on which it is guaranteed
> to maintain the window. It should happen "as soon as possible" but there
> is no strict guarantee (thus "kept at least").
>
> Furthermore, Streams applies a minimum retention time of 1 minute --
> thus, for your specific use case, the 30 seconds you do specify are not
> used (this is unfortunately not documented :( ). However, this in
> unrelated to the behavior you see -- I just mention it for completeness.
>
>
> Thus for you specific use case, streams time is most likely not advance
> to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
> most likely in the buffer) and thus, the next b-record with TS=15
> seconds will be added the the still open (first) b-window and both
> values 2 and 10 get added to 12.
>
>
> Also keep in mind that we do some deduplication on KTable result using
> an internal cache. This can also influence what output record you see.
> For further details see:
>
http://docs.confluent.io/current/streams/developer-guide.html#memory-management


> -Matthias


On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
alexander.demidko@stitchfix.com> wrote:

> Hi folks,
>
> I'm experimenting with Kafka Streams windowed aggregation and came across
> window retention period behavior I don't fully understand.
> I'm using custom timestamp extractor which gets the timestamp from the
> payload. Values are aggregated using tumbling time windows and summed by
> the key.
> I am using kafka and kafka-streams with 0.10.1.1 version.
>
> Full code can be found at https://gist.github.com/xdralex/
> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the following:
>
> val input: KStream[String, String] = builder.stream(Serdes.String(),
> Serdes.String(), "TimeInputTopic")
> val window: Windows[TimeWindow] = TimeWindows.of(60000).
> advanceBy(60000).until(30000)
>
> val aggregated: KTable[Windowed[String], JInt] = input
>   .mapValues((v: String) => parse(v)._2)
>   .groupByKey(Serdes.String(), Serdes.Integer())
>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> "TimeStore1")
>
> aggregated.foreach {
>   (w: Windowed[String], s: JInt) =>
>     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
>     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
>     println(s"Aggregated: $start..$end - ${w.key()} - $s")
> }
>
> Here is the data being sent to TimeInputTopic:
> a,1970-01-01T00:00:00Z 1
> b,1970-01-01T00:00:01Z 2
> a,1970-01-01T00:00:02Z 3
> b,1970-01-01T00:05:01Z 10
> b,1970-01-01T00:00:15Z 10
>
> Here is the output:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
>
> Here is what confuses me.
> I would expect that once an event <b,05:01 10> is received, it should
> update some sort of "current" time value (watermark?), and, because 05:01 is
> bigger than 00:00..01:00 + 30 seconds of retention, either:
>
> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
> received, recreate this bucket. This means there would be two outputs for
> 00:00..01:00:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
> Not sure about this behavior because http://docs.confluent.
> io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying:
> "Kafka Streams guarantees to keep a window for *at least this specified
> time*". So I guess the window can be kept longer...
>
>
> B) Just drop the incoming event <b,00:15 10> altogether. In this case
> there would be only one output for 00:00..01:00:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> I could expect this because http://docs.confluent.
> io/3.1.1/streams/concepts.html#windowing is saying: "If a record arrives
> after the retention period has passed, the record cannot be processed and
> is dropped".
>
>
> Hope all this makes sense. Few questions:
>
> – If the window can be kept in store longer, are there any thresholds when
> it will finally be purged? For example, it would be nice to flush old
> buckets if they are taking too much space.
>
> – How is "current" time value updated / how do Kafka Streams decide that
> the retention period has passed? Does it maintain a watermark with the
> biggest time seen?
>
> – What is the right mailing list to ask questions about Kafka Streams? I
> had a choice between this one and Confluent Platform list, and given that
> open source part of CP consists from patched vanilla Kafka, was not sure
> where to write.
>
> Thanks,
> Alex
>
>
>
>
>
>