You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Navneeth Krishnan <re...@gmail.com> on 2019/10/15 07:56:08 UTC

Tumbling Time Window

Hi All,

I'm trying to create a tumbling time window of two seconds using PAPI. I
have a TimestampWindowStore with both retention and window size as 2
seconds and retainDuplicates as false.

Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1",
        Duration.ofMillis(2000), Duration.ofMillis(2000), false)


In my process function I keep adding data to the state store and I have a
scheduled task initiated which runs every seconds to collect the data
inside the window. The way I thought the window store iterator will return
is Key and List of records for that window but it's returning just a window
object and a record. How can I aggregate this and collectively send to
downstream?

I looked at KGroupedStreamImpl for reference but it seems quite confusing.
Any advice on how this can be done?

Thanks

Re: Tumbling Time Window

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I guess you have 3 options:

1) if there is data for a different key, check if there is data is the
store that you want to flush
2) register an event-time punctuation (maybe for each key? or for a
range of keys?) and check on a regular basis if there is anything that
you want to forward
3) similar to (2), however use wall-clock time


-Matthias

On 10/16/19 1:50 PM, Navneeth Krishnan wrote:
> Thanks a lot Matthias. I looked at the aggregation and I'm fine with
> aggregating data before forwarding it to downstream but the
> KStreamWindowAggregateProcessor::process
> uses the key to determine whether the data has to be aggregated and
> forwarded or not. My worry is if I have a tumbling window for 2 seconds and
> there is just one data then how can I evict this data and forward it to
> downstream when there is no incoming data for the same key.
> 
> In short, Is there a way to evict the data from window even when there is
> no incoming data for the key.
> 
> Thanks
> 
> On Tue, Oct 15, 2019 at 1:40 AM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> A window store contains key-value pair, with key being the window id
>> (key + window-start-timestamp) and the value being the current
>> aggregate. It does not store a list of input values.
>>
>> If you want to store a list of input values, you would need to make the
>> value type a `List` (or similar) or values. However, this is not
>> recommended and for large window might not work as all, because each
>> key-value pair will be stored in the corresponding changelog topic as a
>> single message (hence, you might hit the `max.message.bytes` limit --
>> you could of course increase it).
>>
>> Hence, it's recommended to re-compute the aggregation for each input
>> record incrementally if possible (not sure what aggregation you want to
>> do and if that is possible). That is also what
>> `KStreamWindowAggregateProcessor` does:
>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L105
>>
>> Hope that helps.
>>
>>
>> -Matthias
>>
>> On 10/15/19 12:56 AM, Navneeth Krishnan wrote:
>>> Hi All,
>>>
>>> I'm trying to create a tumbling time window of two seconds using PAPI. I
>>> have a TimestampWindowStore with both retention and window size as 2
>>> seconds and retainDuplicates as false.
>>>
>>>
>> Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1",
>>>         Duration.ofMillis(2000), Duration.ofMillis(2000), false)
>>>
>>>
>>> In my process function I keep adding data to the state store and I have a
>>> scheduled task initiated which runs every seconds to collect the data
>>> inside the window. The way I thought the window store iterator will
>> return
>>> is Key and List of records for that window but it's returning just a
>> window
>>> object and a record. How can I aggregate this and collectively send to
>>> downstream?
>>>
>>> I looked at KGroupedStreamImpl for reference but it seems quite
>> confusing.
>>> Any advice on how this can be done?
>>>
>>> Thanks
>>>
>>
>>
> 


Re: Tumbling Time Window

Posted by Navneeth Krishnan <re...@gmail.com>.
Thanks a lot Matthias. I looked at the aggregation and I'm fine with
aggregating data before forwarding it to downstream but the
KStreamWindowAggregateProcessor::process
uses the key to determine whether the data has to be aggregated and
forwarded or not. My worry is if I have a tumbling window for 2 seconds and
there is just one data then how can I evict this data and forward it to
downstream when there is no incoming data for the same key.

In short, Is there a way to evict the data from window even when there is
no incoming data for the key.

Thanks

On Tue, Oct 15, 2019 at 1:40 AM Matthias J. Sax <ma...@confluent.io>
wrote:

> A window store contains key-value pair, with key being the window id
> (key + window-start-timestamp) and the value being the current
> aggregate. It does not store a list of input values.
>
> If you want to store a list of input values, you would need to make the
> value type a `List` (or similar) or values. However, this is not
> recommended and for large window might not work as all, because each
> key-value pair will be stored in the corresponding changelog topic as a
> single message (hence, you might hit the `max.message.bytes` limit --
> you could of course increase it).
>
> Hence, it's recommended to re-compute the aggregation for each input
> record incrementally if possible (not sure what aggregation you want to
> do and if that is possible). That is also what
> `KStreamWindowAggregateProcessor` does:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L105
>
> Hope that helps.
>
>
> -Matthias
>
> On 10/15/19 12:56 AM, Navneeth Krishnan wrote:
> > Hi All,
> >
> > I'm trying to create a tumbling time window of two seconds using PAPI. I
> > have a TimestampWindowStore with both retention and window size as 2
> > seconds and retainDuplicates as false.
> >
> >
> Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1",
> >         Duration.ofMillis(2000), Duration.ofMillis(2000), false)
> >
> >
> > In my process function I keep adding data to the state store and I have a
> > scheduled task initiated which runs every seconds to collect the data
> > inside the window. The way I thought the window store iterator will
> return
> > is Key and List of records for that window but it's returning just a
> window
> > object and a record. How can I aggregate this and collectively send to
> > downstream?
> >
> > I looked at KGroupedStreamImpl for reference but it seems quite
> confusing.
> > Any advice on how this can be done?
> >
> > Thanks
> >
>
>

Re: Tumbling Time Window

Posted by "Matthias J. Sax" <ma...@confluent.io>.
A window store contains key-value pair, with key being the window id
(key + window-start-timestamp) and the value being the current
aggregate. It does not store a list of input values.

If you want to store a list of input values, you would need to make the
value type a `List` (or similar) or values. However, this is not
recommended and for large window might not work as all, because each
key-value pair will be stored in the corresponding changelog topic as a
single message (hence, you might hit the `max.message.bytes` limit --
you could of course increase it).

Hence, it's recommended to re-compute the aggregation for each input
record incrementally if possible (not sure what aggregation you want to
do and if that is possible). That is also what
`KStreamWindowAggregateProcessor` does:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java#L105

Hope that helps.


-Matthias

On 10/15/19 12:56 AM, Navneeth Krishnan wrote:
> Hi All,
> 
> I'm trying to create a tumbling time window of two seconds using PAPI. I
> have a TimestampWindowStore with both retention and window size as 2
> seconds and retainDuplicates as false.
> 
> Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore("window-store-1",
>         Duration.ofMillis(2000), Duration.ofMillis(2000), false)
> 
> 
> In my process function I keep adding data to the state store and I have a
> scheduled task initiated which runs every seconds to collect the data
> inside the window. The way I thought the window store iterator will return
> is Key and List of records for that window but it's returning just a window
> object and a record. How can I aggregate this and collectively send to
> downstream?
> 
> I looked at KGroupedStreamImpl for reference but it seems quite confusing.
> Any advice on how this can be done?
> 
> Thanks
>