You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Garrett Barton <ga...@gmail.com> on 2017/05/05 02:36:22 UTC

Verify time semantics through topology

I think I have an understanding of how Kafka Streams is handling time
behind the scenes and would like someone to verify it for me.  The actual
reason is I am running into behavior where I only can join two streams for
a little, then it stops working.

Assuming a topology like this:

FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
countKStream.
FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
sumKStream.

countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
reduce() = avgKTable

Given that FEED is populated into kafka with the event time for the
timestamp (and just to make sure I have a TimeExtractor extracting the time
again), I believe time processing happens like this (ET = Event Time, PT =
Process Time):

FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
countKStream.
ET -> ET -> ET -> PT

FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
sumKStream.
ET -> ET -> ET -> PT

countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
reduce() = avgKTable
PT -> PT -> PT

Thus my join has really attempted to join records based on kafka's
processing time from the previous aggregations and not by event time like I
want.  When the streams start things seem to work well, avg topic/stores
populate.  After a few minutes count gets way ahead of sum and then avg
completely stops populating anything.  My hunch is that the processing time
gets outside that 1 minute join window and it no longer joins, increasing
the until to any number (tried 1 year) has no effect either.

Is this the correct way to calculate an average over a 1 minute event time
window with say a 14 day lag time (to load old data)?

Thank you all!

Re: Verify time semantics through topology

Posted by "Matthias J. Sax" <ma...@confluent.io>.
About the join:

Joins work perfectly fine if you apply them to "plain records" you read
from a topic. When joining records, the records timestamp is used to
compute the join result.

The "problem" in your case is that you apply the join to a windowed
aggregation result. And thus, there is no "record timestamp" and Streams
falls back to "stream time" to assign a timestamp to the window result
record that can be used for the join.

Glad it works with my suggested solution. :)

-Matthias

On 5/5/17 6:41 PM, Garrett Barton wrote:
> Matthias,
>  That does make a lot of sense, so Streams never will create time its
> always using a byproduct of a record time passed into it.  Thus in theory
> unless I force a change somewhere in a flow, the flow will stay as I start
> it.
> 
> The confusing part is around joins, since 'stream time' is kinda loosely
> derived from where kafka streams thinks it is globally from consuming the
> upstream topic, and this is where the timing can get out of sync.  And it
> did break my original flow after a few minutes every single time.  That
> part kind of makes me think that in a join the window and until likely
> should be the same value, given that the streams could be off quite a bit.
> But that is another topic.
> 
>  I redid my stream as you suggested and it worked wonderfully, shrunk the
> flows considerably, and I can finally calculate averages consistently
> longer than a few minutes. Thank you!
> 
> On Fri, May 5, 2017 at 1:06 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> That part of time tracking is a little tricky.
>>
>> Streams internally maintains "stream time" -- this model the progress of
>> your application over all input partitions and topics, and is based on
>> the timestamps return by the timestamp extractor. Thus, if timestamp
>> extractor returns even time, "stream time" will we event-time based,
>> too. (Streams, never calls System.currentTimeMillis() so assign
>> timestamps.)
>>
>> This internally tracked "stream time" is used in punctuate() (yes, low
>> level API only) and for window operations to define the output record's
>> timestamp. As "stream time" depends on record processing order, it might
>> vary a little bit (the computation of it itself is deterministic, but it
>> depends what records get fetched from the brokers, and the fetching step
>> is not deterministic, making "global" processing order
>> non-deterministic, too -- what is a general Kafka property: order is
>> only guaranteed within a single partitions, but not across partitions).
>> This little varying in "stream time" computation might break you join
>> step in your original code... You would need to base the join on
>> window-start time and not on event-time to get it right (and thus, you
>> would not even need a windowed join). But the join is to "clumsy" anyway.
>>
>> Does this answer all your questions?
>>
>> (We don't document those details on purpose, because it's an internal
>> design and we want the flexibility to change this if required -- thus,
>> you should also not rely on "stream time" advance assumptions in your
>> code.)
>>
>>
>> -Matthias
>>
>>
>> On 5/5/17 8:09 AM, Garrett Barton wrote:
>>> That does actually, I never thought about a custom value object to hold
>> the
>>> Count/Sum variables. Thank you!
>>>
>>> For the time semantics here is where I got hung up, copied from kafka
>>> streams documentation:
>>>
>>> Finally, whenever a Kafka Streams application writes records to Kafka,
>> then
>>> it will also assign timestamps to these new records. The way the
>> timestamps
>>> are assigned depends on the context:
>>>
>>>    - When new output records are generated via processing some input
>>>    record, for example, context.forward() triggered in the process()
>>>    function call, output record timestamps are inherited from input
>> record
>>>    timestamps directly.
>>>       - *Given I set things to Event Time, this would output Event Time
>>>       correct?*
>>>       - When new output records are generated via periodic functions such
>>>    as punctuate(), the output record timestamp is defined as the current
>>>    internal time (obtained through context.timestamp()) of the stream
>> task.
>>>       - *This is where I am confused, what operations count as a
>>>       punctuate()? Just the low level api? And are these thus Process
>> time?*
>>>       - For aggregations, the timestamp of a resulting aggregate update
>>>    record will be that of the latest arrived input record that triggered
>> the
>>>    update.
>>>       - *This sounds like last used Event Time, correct?*
>>>
>>>
>>> On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am not sure if I understand correctly:  If you use default
>>>> TimestampExtractor, the whole pipeline will be event-time based.
>>>>
>>>> However, as you want to compute the AVG, I would recommend a different
>>>> pattern anyway:
>>>>
>>>> FEED -> groupByKey() -> window() -> aggregate() -> mapValues() =
>> avgKTable
>>>>
>>>> In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
>>>> records (ie, a custom data data for value) and in mapValue() you compute
>>>> <k,avg>.
>>>>
>>>> Hope this helps.
>>>>
>>>> -Matthias
>>>>
>>>> On 5/4/17 7:36 PM, Garrett Barton wrote:
>>>>> I think I have an understanding of how Kafka Streams is handling time
>>>>> behind the scenes and would like someone to verify it for me.  The
>> actual
>>>>> reason is I am running into behavior where I only can join two streams
>>>> for
>>>>> a little, then it stops working.
>>>>>
>>>>> Assuming a topology like this:
>>>>>
>>>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
>>>>> countKStream.
>>>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
>>>>> sumKStream.
>>>>>
>>>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
>>>>> reduce() = avgKTable
>>>>>
>>>>> Given that FEED is populated into kafka with the event time for the
>>>>> timestamp (and just to make sure I have a TimeExtractor extracting the
>>>> time
>>>>> again), I believe time processing happens like this (ET = Event Time,
>> PT
>>>> =
>>>>> Process Time):
>>>>>
>>>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
>>>>> countKStream.
>>>>> ET -> ET -> ET -> PT
>>>>>
>>>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
>>>>> sumKStream.
>>>>> ET -> ET -> ET -> PT
>>>>>
>>>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
>>>>> reduce() = avgKTable
>>>>> PT -> PT -> PT
>>>>>
>>>>> Thus my join has really attempted to join records based on kafka's
>>>>> processing time from the previous aggregations and not by event time
>>>> like I
>>>>> want.  When the streams start things seem to work well, avg
>> topic/stores
>>>>> populate.  After a few minutes count gets way ahead of sum and then avg
>>>>> completely stops populating anything.  My hunch is that the processing
>>>> time
>>>>> gets outside that 1 minute join window and it no longer joins,
>> increasing
>>>>> the until to any number (tried 1 year) has no effect either.
>>>>>
>>>>> Is this the correct way to calculate an average over a 1 minute event
>>>> time
>>>>> window with say a 14 day lag time (to load old data)?
>>>>>
>>>>> Thank you all!
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: Verify time semantics through topology

Posted by Garrett Barton <ga...@gmail.com>.
Matthias,
 That does make a lot of sense, so Streams never will create time its
always using a byproduct of a record time passed into it.  Thus in theory
unless I force a change somewhere in a flow, the flow will stay as I start
it.

The confusing part is around joins, since 'stream time' is kinda loosely
derived from where kafka streams thinks it is globally from consuming the
upstream topic, and this is where the timing can get out of sync.  And it
did break my original flow after a few minutes every single time.  That
part kind of makes me think that in a join the window and until likely
should be the same value, given that the streams could be off quite a bit.
But that is another topic.

 I redid my stream as you suggested and it worked wonderfully, shrunk the
flows considerably, and I can finally calculate averages consistently
longer than a few minutes. Thank you!

On Fri, May 5, 2017 at 1:06 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> That part of time tracking is a little tricky.
>
> Streams internally maintains "stream time" -- this model the progress of
> your application over all input partitions and topics, and is based on
> the timestamps return by the timestamp extractor. Thus, if timestamp
> extractor returns even time, "stream time" will we event-time based,
> too. (Streams, never calls System.currentTimeMillis() so assign
> timestamps.)
>
> This internally tracked "stream time" is used in punctuate() (yes, low
> level API only) and for window operations to define the output record's
> timestamp. As "stream time" depends on record processing order, it might
> vary a little bit (the computation of it itself is deterministic, but it
> depends what records get fetched from the brokers, and the fetching step
> is not deterministic, making "global" processing order
> non-deterministic, too -- what is a general Kafka property: order is
> only guaranteed within a single partitions, but not across partitions).
> This little varying in "stream time" computation might break you join
> step in your original code... You would need to base the join on
> window-start time and not on event-time to get it right (and thus, you
> would not even need a windowed join). But the join is to "clumsy" anyway.
>
> Does this answer all your questions?
>
> (We don't document those details on purpose, because it's an internal
> design and we want the flexibility to change this if required -- thus,
> you should also not rely on "stream time" advance assumptions in your
> code.)
>
>
> -Matthias
>
>
> On 5/5/17 8:09 AM, Garrett Barton wrote:
> > That does actually, I never thought about a custom value object to hold
> the
> > Count/Sum variables. Thank you!
> >
> > For the time semantics here is where I got hung up, copied from kafka
> > streams documentation:
> >
> > Finally, whenever a Kafka Streams application writes records to Kafka,
> then
> > it will also assign timestamps to these new records. The way the
> timestamps
> > are assigned depends on the context:
> >
> >    - When new output records are generated via processing some input
> >    record, for example, context.forward() triggered in the process()
> >    function call, output record timestamps are inherited from input
> record
> >    timestamps directly.
> >       - *Given I set things to Event Time, this would output Event Time
> >       correct?*
> >       - When new output records are generated via periodic functions such
> >    as punctuate(), the output record timestamp is defined as the current
> >    internal time (obtained through context.timestamp()) of the stream
> task.
> >       - *This is where I am confused, what operations count as a
> >       punctuate()? Just the low level api? And are these thus Process
> time?*
> >       - For aggregations, the timestamp of a resulting aggregate update
> >    record will be that of the latest arrived input record that triggered
> the
> >    update.
> >       - *This sounds like last used Event Time, correct?*
> >
> >
> > On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Hi,
> >>
> >> I am not sure if I understand correctly:  If you use default
> >> TimestampExtractor, the whole pipeline will be event-time based.
> >>
> >> However, as you want to compute the AVG, I would recommend a different
> >> pattern anyway:
> >>
> >> FEED -> groupByKey() -> window() -> aggregate() -> mapValues() =
> avgKTable
> >>
> >> In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
> >> records (ie, a custom data data for value) and in mapValue() you compute
> >> <k,avg>.
> >>
> >> Hope this helps.
> >>
> >> -Matthias
> >>
> >> On 5/4/17 7:36 PM, Garrett Barton wrote:
> >>> I think I have an understanding of how Kafka Streams is handling time
> >>> behind the scenes and would like someone to verify it for me.  The
> actual
> >>> reason is I am running into behavior where I only can join two streams
> >> for
> >>> a little, then it stops working.
> >>>
> >>> Assuming a topology like this:
> >>>
> >>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> >>> countKStream.
> >>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> >>> sumKStream.
> >>>
> >>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> >>> reduce() = avgKTable
> >>>
> >>> Given that FEED is populated into kafka with the event time for the
> >>> timestamp (and just to make sure I have a TimeExtractor extracting the
> >> time
> >>> again), I believe time processing happens like this (ET = Event Time,
> PT
> >> =
> >>> Process Time):
> >>>
> >>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> >>> countKStream.
> >>> ET -> ET -> ET -> PT
> >>>
> >>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> >>> sumKStream.
> >>> ET -> ET -> ET -> PT
> >>>
> >>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> >>> reduce() = avgKTable
> >>> PT -> PT -> PT
> >>>
> >>> Thus my join has really attempted to join records based on kafka's
> >>> processing time from the previous aggregations and not by event time
> >> like I
> >>> want.  When the streams start things seem to work well, avg
> topic/stores
> >>> populate.  After a few minutes count gets way ahead of sum and then avg
> >>> completely stops populating anything.  My hunch is that the processing
> >> time
> >>> gets outside that 1 minute join window and it no longer joins,
> increasing
> >>> the until to any number (tried 1 year) has no effect either.
> >>>
> >>> Is this the correct way to calculate an average over a 1 minute event
> >> time
> >>> window with say a 14 day lag time (to load old data)?
> >>>
> >>> Thank you all!
> >>>
> >>
> >>
> >
>
>

Re: Verify time semantics through topology

Posted by "Matthias J. Sax" <ma...@confluent.io>.
That part of time tracking is a little tricky.

Streams internally maintains "stream time" -- this model the progress of
your application over all input partitions and topics, and is based on
the timestamps return by the timestamp extractor. Thus, if timestamp
extractor returns even time, "stream time" will we event-time based,
too. (Streams, never calls System.currentTimeMillis() so assign timestamps.)

This internally tracked "stream time" is used in punctuate() (yes, low
level API only) and for window operations to define the output record's
timestamp. As "stream time" depends on record processing order, it might
vary a little bit (the computation of it itself is deterministic, but it
depends what records get fetched from the brokers, and the fetching step
is not deterministic, making "global" processing order
non-deterministic, too -- what is a general Kafka property: order is
only guaranteed within a single partitions, but not across partitions).
This little varying in "stream time" computation might break you join
step in your original code... You would need to base the join on
window-start time and not on event-time to get it right (and thus, you
would not even need a windowed join). But the join is to "clumsy" anyway.

Does this answer all your questions?

(We don't document those details on purpose, because it's an internal
design and we want the flexibility to change this if required -- thus,
you should also not rely on "stream time" advance assumptions in your code.)


-Matthias


On 5/5/17 8:09 AM, Garrett Barton wrote:
> That does actually, I never thought about a custom value object to hold the
> Count/Sum variables. Thank you!
> 
> For the time semantics here is where I got hung up, copied from kafka
> streams documentation:
> 
> Finally, whenever a Kafka Streams application writes records to Kafka, then
> it will also assign timestamps to these new records. The way the timestamps
> are assigned depends on the context:
> 
>    - When new output records are generated via processing some input
>    record, for example, context.forward() triggered in the process()
>    function call, output record timestamps are inherited from input record
>    timestamps directly.
>       - *Given I set things to Event Time, this would output Event Time
>       correct?*
>       - When new output records are generated via periodic functions such
>    as punctuate(), the output record timestamp is defined as the current
>    internal time (obtained through context.timestamp()) of the stream task.
>       - *This is where I am confused, what operations count as a
>       punctuate()? Just the low level api? And are these thus Process time?*
>       - For aggregations, the timestamp of a resulting aggregate update
>    record will be that of the latest arrived input record that triggered the
>    update.
>       - *This sounds like last used Event Time, correct?*
> 
> 
> On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Hi,
>>
>> I am not sure if I understand correctly:  If you use default
>> TimestampExtractor, the whole pipeline will be event-time based.
>>
>> However, as you want to compute the AVG, I would recommend a different
>> pattern anyway:
>>
>> FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable
>>
>> In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
>> records (ie, a custom data data for value) and in mapValue() you compute
>> <k,avg>.
>>
>> Hope this helps.
>>
>> -Matthias
>>
>> On 5/4/17 7:36 PM, Garrett Barton wrote:
>>> I think I have an understanding of how Kafka Streams is handling time
>>> behind the scenes and would like someone to verify it for me.  The actual
>>> reason is I am running into behavior where I only can join two streams
>> for
>>> a little, then it stops working.
>>>
>>> Assuming a topology like this:
>>>
>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
>>> countKStream.
>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
>>> sumKStream.
>>>
>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
>>> reduce() = avgKTable
>>>
>>> Given that FEED is populated into kafka with the event time for the
>>> timestamp (and just to make sure I have a TimeExtractor extracting the
>> time
>>> again), I believe time processing happens like this (ET = Event Time, PT
>> =
>>> Process Time):
>>>
>>> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
>>> countKStream.
>>> ET -> ET -> ET -> PT
>>>
>>> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
>>> sumKStream.
>>> ET -> ET -> ET -> PT
>>>
>>> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
>>> reduce() = avgKTable
>>> PT -> PT -> PT
>>>
>>> Thus my join has really attempted to join records based on kafka's
>>> processing time from the previous aggregations and not by event time
>> like I
>>> want.  When the streams start things seem to work well, avg topic/stores
>>> populate.  After a few minutes count gets way ahead of sum and then avg
>>> completely stops populating anything.  My hunch is that the processing
>> time
>>> gets outside that 1 minute join window and it no longer joins, increasing
>>> the until to any number (tried 1 year) has no effect either.
>>>
>>> Is this the correct way to calculate an average over a 1 minute event
>> time
>>> window with say a 14 day lag time (to load old data)?
>>>
>>> Thank you all!
>>>
>>
>>
> 


Re: Verify time semantics through topology

Posted by Garrett Barton <ga...@gmail.com>.
That does actually, I never thought about a custom value object to hold the
Count/Sum variables. Thank you!

For the time semantics here is where I got hung up, copied from kafka
streams documentation:

Finally, whenever a Kafka Streams application writes records to Kafka, then
it will also assign timestamps to these new records. The way the timestamps
are assigned depends on the context:

   - When new output records are generated via processing some input
   record, for example, context.forward() triggered in the process()
   function call, output record timestamps are inherited from input record
   timestamps directly.
      - *Given I set things to Event Time, this would output Event Time
      correct?*
      - When new output records are generated via periodic functions such
   as punctuate(), the output record timestamp is defined as the current
   internal time (obtained through context.timestamp()) of the stream task.
      - *This is where I am confused, what operations count as a
      punctuate()? Just the low level api? And are these thus Process time?*
      - For aggregations, the timestamp of a resulting aggregate update
   record will be that of the latest arrived input record that triggered the
   update.
      - *This sounds like last used Event Time, correct?*


On Fri, May 5, 2017 at 1:16 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi,
>
> I am not sure if I understand correctly:  If you use default
> TimestampExtractor, the whole pipeline will be event-time based.
>
> However, as you want to compute the AVG, I would recommend a different
> pattern anyway:
>
> FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable
>
> In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
> records (ie, a custom data data for value) and in mapValue() you compute
> <k,avg>.
>
> Hope this helps.
>
> -Matthias
>
> On 5/4/17 7:36 PM, Garrett Barton wrote:
> > I think I have an understanding of how Kafka Streams is handling time
> > behind the scenes and would like someone to verify it for me.  The actual
> > reason is I am running into behavior where I only can join two streams
> for
> > a little, then it stops working.
> >
> > Assuming a topology like this:
> >
> > FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> > countKStream.
> > FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> > sumKStream.
> >
> > countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> > reduce() = avgKTable
> >
> > Given that FEED is populated into kafka with the event time for the
> > timestamp (and just to make sure I have a TimeExtractor extracting the
> time
> > again), I believe time processing happens like this (ET = Event Time, PT
> =
> > Process Time):
> >
> > FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> > countKStream.
> > ET -> ET -> ET -> PT
> >
> > FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> > sumKStream.
> > ET -> ET -> ET -> PT
> >
> > countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> > reduce() = avgKTable
> > PT -> PT -> PT
> >
> > Thus my join has really attempted to join records based on kafka's
> > processing time from the previous aggregations and not by event time
> like I
> > want.  When the streams start things seem to work well, avg topic/stores
> > populate.  After a few minutes count gets way ahead of sum and then avg
> > completely stops populating anything.  My hunch is that the processing
> time
> > gets outside that 1 minute join window and it no longer joins, increasing
> > the until to any number (tried 1 year) has no effect either.
> >
> > Is this the correct way to calculate an average over a 1 minute event
> time
> > window with say a 14 day lag time (to load old data)?
> >
> > Thank you all!
> >
>
>

Re: Verify time semantics through topology

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

I am not sure if I understand correctly:  If you use default
TimestampExtractor, the whole pipeline will be event-time based.

However, as you want to compute the AVG, I would recommend a different
pattern anyway:

FEED -> groupByKey() -> window() -> aggregate() -> mapValues() = avgKTable

In aggregate, you compute both count and sum and emit <k,(cnt,sum)>
records (ie, a custom data data for value) and in mapValue() you compute
<k,avg>.

Hope this helps.

-Matthias

On 5/4/17 7:36 PM, Garrett Barton wrote:
> I think I have an understanding of how Kafka Streams is handling time
> behind the scenes and would like someone to verify it for me.  The actual
> reason is I am running into behavior where I only can join two streams for
> a little, then it stops working.
> 
> Assuming a topology like this:
> 
> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> countKStream.
> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> sumKStream.
> 
> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> reduce() = avgKTable
> 
> Given that FEED is populated into kafka with the event time for the
> timestamp (and just to make sure I have a TimeExtractor extracting the time
> again), I believe time processing happens like this (ET = Event Time, PT =
> Process Time):
> 
> FEED -> groupByKey() -> window(1 min) -> count() -> toStream() =
> countKStream.
> ET -> ET -> ET -> PT
> 
> FEED -> groupByKey() -> window(1 min) -> reduce() -> toStream() =
> sumKStream.
> ET -> ET -> ET -> PT
> 
> countKStream.join(sumKStream) -> joinWindow(1 min, until 1 month) ->
> reduce() = avgKTable
> PT -> PT -> PT
> 
> Thus my join has really attempted to join records based on kafka's
> processing time from the previous aggregations and not by event time like I
> want.  When the streams start things seem to work well, avg topic/stores
> populate.  After a few minutes count gets way ahead of sum and then avg
> completely stops populating anything.  My hunch is that the processing time
> gets outside that 1 minute join window and it no longer joins, increasing
> the until to any number (tried 1 year) has no effect either.
> 
> Is this the correct way to calculate an average over a 1 minute event time
> window with say a 14 day lag time (to load old data)?
> 
> Thank you all!
>