You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jon Yeargers <jo...@cedexis.com> on 2016/12/11 02:11:58 UTC

How does 'TimeWindows.of().until()' work?

Ive added the 'until()' clause to some aggregation steps and it's working
wonders for keeping the size of the state store in useful boundaries... But
Im not 100% clear on how it works.

What is implied by the '.until()' clause? What determines when to stop
receiving further data - is it clock time (since the window was created)?
It seems problematic for it to refer to EventTime as this may bounce all
over the place. For non-overlapping windows a given record can only fall
into a single aggregation period - so when would a value get discarded?

Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * 1000L).until(10 *
1000L))'  - but what is this accomplishing?

Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
> So a given window (with a '.until()' setting) is triggered for closing by
> the presence of a record outside the .until() setting?

Yes (and all windows do have a setting for it -- if you do not call it
in you code, default is 1 day).

However, in Kafka Streams, there is no notion of "closing a window" as
other system do. Thus we do not use the term "closing" but "discarding"
for this case as no computation will be triggered -- the window is just
dropped. A computation is triggered for each update of the window, ie,
for each record that falls into a window.


> If the timestamps for records jump about  by a value larger than the
.until
> value you could have windows being created / deleted quite a bit then?

Yes. But you have to think differently. A record will not "jump ahead"
but all records (with smaller timestamp that this record) that are
processes after this record are late. And if they are too late, we do
not guarantee that they are processed.

Think like this


1,2,3,4.........nothing for along time......100,6,7,8

so record with ts=100 did not jump ahead, but records with ts=6,7,8 are
super late.



-Matthias




On 12/13/16 10:01 AM, Jon Yeargers wrote:
> So a given window (with a '.until()' setting) is triggered for closing by
> the presence of a record outside the .until() setting?
> 
> If the timestamps for records jump about  by a value larger than the .until
> value you could have windows being created / deleted quite a bit then?
> 
> On Tue, Dec 13, 2016 at 9:57 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> First, windows are only created if there is actual data for a window. So
>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>> falling into each window (btw: window start-time is inclusive while
>> window end time is exclusive). If you have only 2 record with lets say
>> ts=20 and ts=90 you will not have an open window [25,75). Each window is
>> physically created each time the first record for it is processed.
>>
>> If you have above 4 windows and a record with ts=101 arrives, a new
>> window [101,151) will be created. Window [0,50) will not be deleted yet,
>> because retention is 100 and thus Streams guarantees that all record
>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>> records would fall into window [0,50).
>>
>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>> not before that.
>>
>> -Matthias
>>
>>
>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>> Hi,
>>> So is until for future or past?
>>> Say I get first record at t = 0 and until is 100 and my window size is 50
>>> advance by 25.
>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>> Now at t = 101 it will drop
>>> (0, 50), (25, 75), (50, 100) and create
>>> (101, 150), (125, 175), (150, 200)
>>>
>>> Please confirm if this understanding us correct. It is not clear how it
>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>
>>> What case is not clear again is that at say t = 102 I get some message
>> with
>>> timestamp 99. What happens then?
>>> Will the result added to previous aggregation of (50, 100) or (75, 125),
>>> like it should.
>>>
>>> Or it will recreate the old window (50, 100) and aggregate the value
>> there
>>> and then drop it. This would result is wrong aggregated value, as it does
>>> not consider the previous aggregated values.
>>>
>>> So this is the pressing case I am not able to understand. Maybe I am
>> wrong
>>> at some basic understanding.
>>>
>>>
>>> Next for
>>> The parameter
>>>> windowstore.changelog.additional.retention.ms
>>>
>>> How does this relate to rentention.ms param of topic config?
>>> I create internal topic manually using say rentention.ms=3600000.
>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>> internal changelog topic as well and I want it to be retained for say
>> just
>>> 1 hour.
>>> So how does that above parameter interfere with this topic level setting.
>>> Or now I just need to set above config as 3600000 and not add
>>> rentention.ms=3600000
>>> while creating internal topic.
>>> This is just another doubt remaining here.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Sachin,
>>>>
>>>> There is no reason to have an .until() AND a .retain() -- just increase
>>>> the value of .until()
>>>>
>>>> If you have a window of let's say 1h size and you set .until() also to
>>>> 1h -- you can obviously not process any late arriving data. If you set
>>>> until() to 2h is this example, you can process data that is up to 1h
>>>> delayed.
>>>>
>>>> So basically, the retention should always be larger than you window
>> size.
>>>>
>>>> The parameter
>>>>> windowstore.changelog.additional.retention.ms
>>>>
>>>> is applies to changelog topics that backup window state stores. Those
>>>> changelog topics are compacted. However, the used key does encode an
>>>> window ID and thus older data can never be cleaned up by compaction.
>>>> Therefore, an additional retention time is applied to those topics, too.
>>>> Thus, if an old window is not updated for this amount of time, it will
>>>> get deleted eventually preventing this topic to grown infinitely.
>>>>
>>>> The value will be determined by until(), i.e., whatever you specify in
>>>> .until() will be used to set this parameter.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> We are facing the exact problem as described by Matthias above.
>>>>> We are keeping default until which is 1 day.
>>>>>
>>>>> Our record's times tamp extractor has a field which increases with
>> time.
>>>>> However for short time we cannot guarantee the time stamp is always
>>>>> increases. So at the boundary ie after 24 hrs we can get records which
>>>> are
>>>>> beyond that windows retention period.
>>>>>
>>>>> Then it happens like it is mentioned above and our aggregation fails.
>>>>>
>>>>> So just to sum up when we get record
>>>>> 24h + 1 sec (it deletes older window and since the new record belongs
>> to
>>>>> the new window its gets created)
>>>>> Now when we get next record of 24 hs - 1 sec since older window is
>>>> dropped
>>>>> it does not get aggregated in that bucket.
>>>>>
>>>>> I suggest we have another setting next to until call retain which
>> retains
>>>>> the older windows into next window.
>>>>>
>>>>> I think at stream window boundary level it should use a concept of
>>>> sliding
>>>>> window. So we can define window like
>>>>>
>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>>> 1000l).untill(7
>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>>>
>>>>> So after 7 days it retains the data covered by windows in last 15
>> minutes
>>>>> which rolls over the data in them to next window. This way streams work
>>>>> continuously.
>>>>>
>>>>> Please let us know your thoughts on this.
>>>>>
>>>>> On another side question on this there is a setting:
>>>>>
>>>>> windowstore.changelog.additional.retention.ms
>>>>> I is not clear what is does. Is this the default for until?
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
>> matthias@confluent.io
>>>>>
>>>>> wrote:
>>>>>
>>>>>> Windows are created on demand, ie, each time a new record arrives and
>>>>>> there is no window yet for it, a new window will get created.
>>>>>>
>>>>>> Windows are accepting data until their retention time (that you can
>>>>>> configure via .until()) passed. Thus, you will have many windows being
>>>>>> open in parallel.
>>>>>>
>>>>>> If you read older data, they will just be put into the corresponding
>>>>>> windows (as long as window retention time did not pass). If a window
>> was
>>>>>> discarded already, a new window with this single (later arriving)
>> record
>>>>>> will get created, the computation will be triggered, you get a result,
>>>>>> and afterwards the window is deleted again (as it's retention time
>>>>>> passed already).
>>>>>>
>>>>>> The retention time is driven by "stream-time", in internal tracked
>> time
>>>>>> that only progressed in forward direction. It gets it value from the
>>>>>> timestamps provided by TimestampExtractor -- thus, per default it will
>>>>>> be event-time.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>>>> I've read this and still have more questions than answers. If my data
>>>>>> skips
>>>>>>> about (timewise) what determines when a given window will start /
>> stop
>>>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>>>
>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Please have a look here:
>>>>>>>>
>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>> guide.html#windowing-a-stream
>>>>>>>>
>>>>>>>> If you have further question, just follow up :)
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
>>>>>> working
>>>>>>>>> wonders for keeping the size of the state store in useful
>>>> boundaries...
>>>>>>>> But
>>>>>>>>> Im not 100% clear on how it works.
>>>>>>>>>
>>>>>>>>> What is implied by the '.until()' clause? What determines when to
>>>> stop
>>>>>>>>> receiving further data - is it clock time (since the window was
>>>>>> created)?
>>>>>>>>> It seems problematic for it to refer to EventTime as this may
>> bounce
>>>>>> all
>>>>>>>>> over the place. For non-overlapping windows a given record can only
>>>>>> fall
>>>>>>>>> into a single aggregation period - so when would a value get
>>>> discarded?
>>>>>>>>>
>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>>>> 1000L).until(10 *
>>>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: How does 'TimeWindows.of().until()' work?

Posted by Jon Yeargers <jo...@cedexis.com>.
So a given window (with a '.until()' setting) is triggered for closing by
the presence of a record outside the .until() setting?

If the timestamps for records jump about  by a value larger than the .until
value you could have windows being created / deleted quite a bit then?

On Tue, Dec 13, 2016 at 9:57 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> First, windows are only created if there is actual data for a window. So
> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> falling into each window (btw: window start-time is inclusive while
> window end time is exclusive). If you have only 2 record with lets say
> ts=20 and ts=90 you will not have an open window [25,75). Each window is
> physically created each time the first record for it is processed.
>
> If you have above 4 windows and a record with ts=101 arrives, a new
> window [101,151) will be created. Window [0,50) will not be deleted yet,
> because retention is 100 and thus Streams guarantees that all record
> with ts >= 1 (= 101 - 100) are still processed correctly and those
> records would fall into window [0,50).
>
> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> not before that.
>
> -Matthias
>
>
> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > Hi,
> > So is until for future or past?
> > Say I get first record at t = 0 and until is 100 and my window size is 50
> > advance by 25.
> > I understand it will create windows (0, 50), (25, 75), (50, 100)
> > Now at t = 101 it will drop
> > (0, 50), (25, 75), (50, 100) and create
> > (101, 150), (125, 175), (150, 200)
> >
> > Please confirm if this understanding us correct. It is not clear how it
> > will handle overlapping windows (75, 125) and (175, 225) and so on?
> >
> > What case is not clear again is that at say t = 102 I get some message
> with
> > timestamp 99. What happens then?
> > Will the result added to previous aggregation of (50, 100) or (75, 125),
> > like it should.
> >
> > Or it will recreate the old window (50, 100) and aggregate the value
> there
> > and then drop it. This would result is wrong aggregated value, as it does
> > not consider the previous aggregated values.
> >
> > So this is the pressing case I am not able to understand. Maybe I am
> wrong
> > at some basic understanding.
> >
> >
> > Next for
> > The parameter
> >> windowstore.changelog.additional.retention.ms
> >
> > How does this relate to rentention.ms param of topic config?
> > I create internal topic manually using say rentention.ms=3600000.
> > In next release (post kafka_2.10-0.10.0.1) since we support delete of
> > internal changelog topic as well and I want it to be retained for say
> just
> > 1 hour.
> > So how does that above parameter interfere with this topic level setting.
> > Or now I just need to set above config as 3600000 and not add
> > rentention.ms=3600000
> > while creating internal topic.
> > This is just another doubt remaining here.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Sachin,
> >>
> >> There is no reason to have an .until() AND a .retain() -- just increase
> >> the value of .until()
> >>
> >> If you have a window of let's say 1h size and you set .until() also to
> >> 1h -- you can obviously not process any late arriving data. If you set
> >> until() to 2h is this example, you can process data that is up to 1h
> >> delayed.
> >>
> >> So basically, the retention should always be larger than you window
> size.
> >>
> >> The parameter
> >>> windowstore.changelog.additional.retention.ms
> >>
> >> is applies to changelog topics that backup window state stores. Those
> >> changelog topics are compacted. However, the used key does encode an
> >> window ID and thus older data can never be cleaned up by compaction.
> >> Therefore, an additional retention time is applied to those topics, too.
> >> Thus, if an old window is not updated for this amount of time, it will
> >> get deleted eventually preventing this topic to grown infinitely.
> >>
> >> The value will be determined by until(), i.e., whatever you specify in
> >> .until() will be used to set this parameter.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>> Hi,
> >>> We are facing the exact problem as described by Matthias above.
> >>> We are keeping default until which is 1 day.
> >>>
> >>> Our record's times tamp extractor has a field which increases with
> time.
> >>> However for short time we cannot guarantee the time stamp is always
> >>> increases. So at the boundary ie after 24 hrs we can get records which
> >> are
> >>> beyond that windows retention period.
> >>>
> >>> Then it happens like it is mentioned above and our aggregation fails.
> >>>
> >>> So just to sum up when we get record
> >>> 24h + 1 sec (it deletes older window and since the new record belongs
> to
> >>> the new window its gets created)
> >>> Now when we get next record of 24 hs - 1 sec since older window is
> >> dropped
> >>> it does not get aggregated in that bucket.
> >>>
> >>> I suggest we have another setting next to until call retain which
> retains
> >>> the older windows into next window.
> >>>
> >>> I think at stream window boundary level it should use a concept of
> >> sliding
> >>> window. So we can define window like
> >>>
> >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >> 1000l).untill(7
> >>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>
> >>> So after 7 days it retains the data covered by windows in last 15
> minutes
> >>> which rolls over the data in them to next window. This way streams work
> >>> continuously.
> >>>
> >>> Please let us know your thoughts on this.
> >>>
> >>> On another side question on this there is a setting:
> >>>
> >>> windowstore.changelog.additional.retention.ms
> >>> I is not clear what is does. Is this the default for until?
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> Windows are created on demand, ie, each time a new record arrives and
> >>>> there is no window yet for it, a new window will get created.
> >>>>
> >>>> Windows are accepting data until their retention time (that you can
> >>>> configure via .until()) passed. Thus, you will have many windows being
> >>>> open in parallel.
> >>>>
> >>>> If you read older data, they will just be put into the corresponding
> >>>> windows (as long as window retention time did not pass). If a window
> was
> >>>> discarded already, a new window with this single (later arriving)
> record
> >>>> will get created, the computation will be triggered, you get a result,
> >>>> and afterwards the window is deleted again (as it's retention time
> >>>> passed already).
> >>>>
> >>>> The retention time is driven by "stream-time", in internal tracked
> time
> >>>> that only progressed in forward direction. It gets it value from the
> >>>> timestamps provided by TimestampExtractor -- thus, per default it will
> >>>> be event-time.
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>> I've read this and still have more questions than answers. If my data
> >>>> skips
> >>>>> about (timewise) what determines when a given window will start /
> stop
> >>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>
> >>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Please have a look here:
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>> guide.html#windowing-a-stream
> >>>>>>
> >>>>>> If you have further question, just follow up :)
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
> >>>> working
> >>>>>>> wonders for keeping the size of the state store in useful
> >> boundaries...
> >>>>>> But
> >>>>>>> Im not 100% clear on how it works.
> >>>>>>>
> >>>>>>> What is implied by the '.until()' clause? What determines when to
> >> stop
> >>>>>>> receiving further data - is it clock time (since the window was
> >>>> created)?
> >>>>>>> It seems problematic for it to refer to EventTime as this may
> bounce
> >>>> all
> >>>>>>> over the place. For non-overlapping windows a given record can only
> >>>> fall
> >>>>>>> into a single aggregation period - so when would a value get
> >> discarded?
> >>>>>>>
> >>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>>>> 1000L).until(10 *
> >>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: How does 'TimeWindows.of().until()' work?

Posted by Damian Guy <da...@gmail.com>.
Hi Sachin,

> windowstore.changelog.additional.retention.ms

>
> How does this relate to rentention.ms param of topic config?
> I create internal topic manually using say rentention.ms=3600000.
> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> internal changelog topic as well and I want it to be retained for say just
> 1 hour.
> So how does that above parameter interfere with this topic level setting.
> Or now I just need to set above config as 3600000 and not add
> rentention.ms=3600000
> while creating internal topic.
>
>
This parameter is only used when creating the internal changelog topic for
a Window store. When the streams library creates a changelog topic for the
Window store it sets the delete policy to compact,delete. So the changelog
topic is both compacted and deleted. For the delete side of it, retention.ms
is set to the value of TimeWindows.until() + the value of
windowstore.changelog.additional.retention.ms. So if you are retaining
windows for 1 day and you just used the default for the above property then
retentions.ms=172800. Any segments older than this will be deleted from the
log.

Thanks,
Damian

Re: How does 'TimeWindows.of().until()' work?

Posted by Damian Guy <da...@gmail.com>.
Yes that is one of the methods. It will be available on the 0.10.2 release
which is due at the beginning of February.

On Mon, 19 Dec 2016 at 12:17 Sachin Mittal <sj...@gmail.com> wrote:

> I believe you are talking about this method.
> public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final
> Initializer<T> initializer,
>                                                                   final
> Aggregator<K, V, T> aggregator,
>                                                                   final
> Windows<W> windows,
>                                                                   final
> StateStoreSupplier<WindowStore> storeSupplier)
>
> Will this api be part of next release?
>
> I can go about using this, however if in StateStoreSupplier we add some api
> to update the logConfig, then we can pass all the topic level props as part
> of streams config directly.
>
> Thanks
> Sachin
>
>
>
> On Mon, Dec 19, 2016 at 5:32 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > I think we have a way of doing what you want already. If you create a
> > custom state store you can call the enableLogging method and pass in any
> > configuration parameters you want: For example:
> >
> > final StateStoreSupplier supplier = Stores.create("store")
> >         .withKeys(Serdes.String())
> >         .withValues(Serdes.String())
> >         .persistent()
> >         .enableLogging(Collections.singletonMap("retention.ms", "1000"))
> >         .build();
> >
> > You can then use the overloaded methods in the DSL to pass in the
> > StateStoreSupplier to your aggregates (trunk only)
> >
> >
> > On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sj...@gmail.com> wrote:
> >
> > > Hi,
> > > I am working towards adding topic configs as part of streams config.
> > > However I have run into an issue:
> > > Code flow is like this
> > >
> > > KStreamBuilder builder = new KStreamBuilder();
> > > builder.stream(...)
> > > ...
> > > KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> > > streams.start();
> > >
> > > So we can see we build the topology before building the streams.
> > > While building topology it assigns state store.
> > > That time no topic config props are available.
> > >
> > > So it creates the supplier with empty topic config.
> > >
> > > Further StateStoreSupplier has method just to get the config and not to
> > > update it.
> > > Map<String, Object> logConfig()
> > >
> > > One way to implement this is change this interface to be able to update
> > the
> > > log config props too.
> > > And we the props are available to streams we update the topology
> > builder's
> > > state stores too with updated config.
> > >
> > > Other way is to change the KStreamBuilder and make it pass the topic
> > > config.
> > > However in second approach we would be splitting the streams config
> into
> > > two parts.
> > >
> > > Let me know how should one proceed with this.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <
> matthias@confluent.io>
> > > wrote:
> > >
> > > > I agree. We got already multiple request to add an API for specifying
> > > > topic parameters for internal topic... I am pretty sure we will add
> it
> > > > if time permits -- feel free to contribute this new feature!
> > > >
> > > > About chancing the value of until: that does not work, as the
> changelog
> > > > topic configuration would not be updated.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > > > Hi,
> > > > > I suggest to include topic config as well as part of streams config
> > > > > properties like we do for producer and consumer configs.
> > > > > The topic config supplied would be used for creating internal
> > changelog
> > > > > topics along with certain additional configs which are applied by
> > > > default.
> > > > >
> > > > > This way we don't have to ever create internal topics manually.
> > > > >
> > > > > I had one doubt regarding until.
> > > > > Say I specify one value and run my streams app.
> > > > > Now I stop the app, specify different value and re start the app.
> > > > >
> > > > > Which value for retain would the old (pre existing) windows use.
> > Would
> > > it
> > > > > be the older value or the new value?
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> > > matthias@confluent.io
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Understood. Makes sense.
> > > > >>
> > > > >> For this, you should apply Streams configs manually when creating
> > > those
> > > > >> topics. For retention parameter, use the value you specify in
> > > > >> corresponding .until() method for it.
> > > > >>
> > > > >>
> > > > >> -Matthias
> > > > >>
> > > > >>
> > > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > > > >>> I was referring to internal change log topic. I had to create
> them
> > > > >> manually
> > > > >>> because in some case the message size of these topic were greater
> > > than
> > > > >> the
> > > > >>> default ones used by kafka streams.
> > > > >>>
> > > > >>> I think someone in this group recommended to create these topic
> > > > >> manually. I
> > > > >>> understand that it is better to have internal topics created by
> > > streams
> > > > >> app
> > > > >>> and I will take a second look at these and see if that can be
> done.
> > > > >>>
> > > > >>> I just wanted to make sure what all configs are applied to
> internal
> > > > >> topics
> > > > >>> in order to decide to avoid them creating manually.
> > > > >>>
> > > > >>> Thanks
> > > > >>> Sachin
> > > > >>>
> > > > >>>
> > > > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> > > > matthias@confluent.io
> > > > >>>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> I am wondering about "I create internal topic manually" -- which
> > > > topics
> > > > >>>> do you refer in detail?
> > > > >>>>
> > > > >>>> Kafka Streams create all kind of internal topics with
> > auto-generated
> > > > >>>> names. So it would be quite tricky to create all of them
> manually
> > > > >>>> (especially because you need to know those name in advance).
> > > > >>>>
> > > > >>>> IRRC, if a topic does exist, Kafka Streams does no change it's
> > > > >>>> configuration. Only if Kafka Streams does create a topic, it
> will
> > > > >>>> specify certain config parameters on topic create step.
> > > > >>>>
> > > > >>>>
> > > > >>>> -Matthias
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > > > >>>>> Hi,
> > > > >>>>> Thanks for the explanation. This illustration makes it super
> easy
> > > to
> > > > >>>>> understand how until works. Perhaps we can update the wiki with
> > > this
> > > > >>>>> illustration.
> > > > >>>>> It is basically the retention time for a past window.
> > > > >>>>> I used to think until creates all the future windows for that
> > > period
> > > > >> and
> > > > >>>>> when time passes that it used to delete all the past windows.
> > > However
> > > > >>>>> actually until retains a window for specified time. This makes
> so
> > > > much
> > > > >>>> more
> > > > >>>>> sense.
> > > > >>>>>
> > > > >>>>> I just had one pending query regarding:
> > > > >>>>>
> > > > >>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>
> > > > >>>>> How does this relate to rentention.ms param of topic config?
> > > > >>>>> I create internal topic manually using say rentention.ms
> > =3600000.
> > > > >>>>> In next release (post kafka_2.10-0.10.0.1) since we support
> > delete
> > > of
> > > > >>>>> internal changelog topic as well and I want it to be retained
> for
> > > say
> > > > >>>> just
> > > > >>>>> 1 hour.
> > > > >>>>> So how does that above parameter interfere with this topic
> level
> > > > >> setting.
> > > > >>>>> Or now I just need to set above config as 3600000 and not add
> > > > >>>>> rentention.ms=3600000
> > > > >>>>> while creating internal topic.
> > > > >>>>>
> > > > >>>>> Thanks
> > > > >>>>> Sachin
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> > > > >> matthias@confluent.io
> > > > >>>>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> First, windows are only created if there is actual data for a
> > > > window.
> > > > >> So
> > > > >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are
> > > > record
> > > > >>>>>> falling into each window (btw: window start-time is inclusive
> > > while
> > > > >>>>>> window end time is exclusive). If you have only 2 record with
> > lets
> > > > say
> > > > >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each
> > > > window
> > > > >> is
> > > > >>>>>> physically created each time the first record for it is
> > processed.
> > > > >>>>>>
> > > > >>>>>> If you have above 4 windows and a record with ts=101 arrives,
> a
> > > new
> > > > >>>>>> window [101,151) will be created. Window [0,50) will not be
> > > deleted
> > > > >> yet,
> > > > >>>>>> because retention is 100 and thus Streams guarantees that all
> > > record
> > > > >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and
> > those
> > > > >>>>>> records would fall into window [0,50).
> > > > >>>>>>
> > > > >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS =
> > 150,
> > > > but
> > > > >>>>>> not before that.
> > > > >>>>>>
> > > > >>>>>> -Matthias
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > > > >>>>>>> Hi,
> > > > >>>>>>> So is until for future or past?
> > > > >>>>>>> Say I get first record at t = 0 and until is 100 and my
> window
> > > size
> > > > >> is
> > > > >>>> 50
> > > > >>>>>>> advance by 25.
> > > > >>>>>>> I understand it will create windows (0, 50), (25, 75), (50,
> > 100)
> > > > >>>>>>> Now at t = 101 it will drop
> > > > >>>>>>> (0, 50), (25, 75), (50, 100) and create
> > > > >>>>>>> (101, 150), (125, 175), (150, 200)
> > > > >>>>>>>
> > > > >>>>>>> Please confirm if this understanding us correct. It is not
> > clear
> > > > how
> > > > >> it
> > > > >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and
> so
> > > on?
> > > > >>>>>>>
> > > > >>>>>>> What case is not clear again is that at say t = 102 I get
> some
> > > > >> message
> > > > >>>>>> with
> > > > >>>>>>> timestamp 99. What happens then?
> > > > >>>>>>> Will the result added to previous aggregation of (50, 100) or
> > > (75,
> > > > >>>> 125),
> > > > >>>>>>> like it should.
> > > > >>>>>>>
> > > > >>>>>>> Or it will recreate the old window (50, 100) and aggregate
> the
> > > > value
> > > > >>>>>> there
> > > > >>>>>>> and then drop it. This would result is wrong aggregated
> value,
> > as
> > > > it
> > > > >>>> does
> > > > >>>>>>> not consider the previous aggregated values.
> > > > >>>>>>>
> > > > >>>>>>> So this is the pressing case I am not able to understand.
> > Maybe I
> > > > am
> > > > >>>>>> wrong
> > > > >>>>>>> at some basic understanding.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Next for
> > > > >>>>>>> The parameter
> > > > >>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>
> > > > >>>>>>> How does this relate to rentention.ms param of topic config?
> > > > >>>>>>> I create internal topic manually using say rentention.ms
> > > =3600000.
> > > > >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support
> > > delete
> > > > of
> > > > >>>>>>> internal changelog topic as well and I want it to be retained
> > for
> > > > say
> > > > >>>>>> just
> > > > >>>>>>> 1 hour.
> > > > >>>>>>> So how does that above parameter interfere with this topic
> > level
> > > > >>>> setting.
> > > > >>>>>>> Or now I just need to set above config as 3600000 and not add
> > > > >>>>>>> rentention.ms=3600000
> > > > >>>>>>> while creating internal topic.
> > > > >>>>>>> This is just another doubt remaining here.
> > > > >>>>>>>
> > > > >>>>>>> Thanks
> > > > >>>>>>> Sachin
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> > > > >>>> matthias@confluent.io>
> > > > >>>>>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> Sachin,
> > > > >>>>>>>>
> > > > >>>>>>>> There is no reason to have an .until() AND a .retain() --
> just
> > > > >>>> increase
> > > > >>>>>>>> the value of .until()
> > > > >>>>>>>>
> > > > >>>>>>>> If you have a window of let's say 1h size and you set
> .until()
> > > > also
> > > > >> to
> > > > >>>>>>>> 1h -- you can obviously not process any late arriving data.
> If
> > > you
> > > > >> set
> > > > >>>>>>>> until() to 2h is this example, you can process data that is
> up
> > > to
> > > > 1h
> > > > >>>>>>>> delayed.
> > > > >>>>>>>>
> > > > >>>>>>>> So basically, the retention should always be larger than you
> > > > window
> > > > >>>>>> size.
> > > > >>>>>>>>
> > > > >>>>>>>> The parameter
> > > > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>>
> > > > >>>>>>>> is applies to changelog topics that backup window state
> > stores.
> > > > >> Those
> > > > >>>>>>>> changelog topics are compacted. However, the used key does
> > > encode
> > > > an
> > > > >>>>>>>> window ID and thus older data can never be cleaned up by
> > > > compaction.
> > > > >>>>>>>> Therefore, an additional retention time is applied to those
> > > > topics,
> > > > >>>> too.
> > > > >>>>>>>> Thus, if an old window is not updated for this amount of
> time,
> > > it
> > > > >> will
> > > > >>>>>>>> get deleted eventually preventing this topic to grown
> > > infinitely.
> > > > >>>>>>>>
> > > > >>>>>>>> The value will be determined by until(), i.e., whatever you
> > > > specify
> > > > >> in
> > > > >>>>>>>> .until() will be used to set this parameter.
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> -Matthias
> > > > >>>>>>>>
> > > > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > > > >>>>>>>>> Hi,
> > > > >>>>>>>>> We are facing the exact problem as described by Matthias
> > above.
> > > > >>>>>>>>> We are keeping default until which is 1 day.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Our record's times tamp extractor has a field which
> increases
> > > > with
> > > > >>>>>> time.
> > > > >>>>>>>>> However for short time we cannot guarantee the time stamp
> is
> > > > always
> > > > >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get
> > > records
> > > > >>>> which
> > > > >>>>>>>> are
> > > > >>>>>>>>> beyond that windows retention period.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Then it happens like it is mentioned above and our
> > aggregation
> > > > >> fails.
> > > > >>>>>>>>>
> > > > >>>>>>>>> So just to sum up when we get record
> > > > >>>>>>>>> 24h + 1 sec (it deletes older window and since the new
> record
> > > > >> belongs
> > > > >>>>>> to
> > > > >>>>>>>>> the new window its gets created)
> > > > >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older
> > window
> > > > is
> > > > >>>>>>>> dropped
> > > > >>>>>>>>> it does not get aggregated in that bucket.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I suggest we have another setting next to until call retain
> > > which
> > > > >>>>>> retains
> > > > >>>>>>>>> the older windows into next window.
> > > > >>>>>>>>>
> > > > >>>>>>>>> I think at stream window boundary level it should use a
> > concept
> > > > of
> > > > >>>>>>>> sliding
> > > > >>>>>>>>> window. So we can define window like
> > > > >>>>>>>>>
> > > > >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> > > > >>>>>>>> 1000l).untill(7
> > > > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> > > > >>>>>>>>>
> > > > >>>>>>>>> So after 7 days it retains the data covered by windows in
> > last
> > > 15
> > > > >>>>>> minutes
> > > > >>>>>>>>> which rolls over the data in them to next window. This way
> > > > streams
> > > > >>>> work
> > > > >>>>>>>>> continuously.
> > > > >>>>>>>>>
> > > > >>>>>>>>> Please let us know your thoughts on this.
> > > > >>>>>>>>>
> > > > >>>>>>>>> On another side question on this there is a setting:
> > > > >>>>>>>>>
> > > > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > > > >>>>>>>>> I is not clear what is does. Is this the default for until?
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks
> > > > >>>>>>>>> Sachin
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> > > > >>>>>> matthias@confluent.io
> > > > >>>>>>>>>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Windows are created on demand, ie, each time a new record
> > > > arrives
> > > > >>>> and
> > > > >>>>>>>>>> there is no window yet for it, a new window will get
> > created.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Windows are accepting data until their retention time
> (that
> > > you
> > > > >> can
> > > > >>>>>>>>>> configure via .until()) passed. Thus, you will have many
> > > windows
> > > > >>>> being
> > > > >>>>>>>>>> open in parallel.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> If you read older data, they will just be put into the
> > > > >> corresponding
> > > > >>>>>>>>>> windows (as long as window retention time did not pass).
> If
> > a
> > > > >> window
> > > > >>>>>> was
> > > > >>>>>>>>>> discarded already, a new window with this single (later
> > > > arriving)
> > > > >>>>>> record
> > > > >>>>>>>>>> will get created, the computation will be triggered, you
> > get a
> > > > >>>> result,
> > > > >>>>>>>>>> and afterwards the window is deleted again (as it's
> > retention
> > > > time
> > > > >>>>>>>>>> passed already).
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> The retention time is driven by "stream-time", in internal
> > > > tracked
> > > > >>>>>> time
> > > > >>>>>>>>>> that only progressed in forward direction. It gets it
> value
> > > from
> > > > >> the
> > > > >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per
> > default
> > > > it
> > > > >>>> will
> > > > >>>>>>>>>> be event-time.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> -Matthias
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> > > > >>>>>>>>>>> I've read this and still have more questions than
> answers.
> > If
> > > > my
> > > > >>>> data
> > > > >>>>>>>>>> skips
> > > > >>>>>>>>>>> about (timewise) what determines when a given window will
> > > > start /
> > > > >>>>>> stop
> > > > >>>>>>>>>>> accepting new data? What if Im reading data from some
> time
> > > ago?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> > > > >>>>>>>> matthias@confluent.io>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Please have a look here:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> > > > >>>>>>>>>>>> guide.html#windowing-a-stream
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> If you have further question, just follow up :)
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> -Matthias
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> > > > >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation
> steps
> > > and
> > > > >> it's
> > > > >>>>>>>>>> working
> > > > >>>>>>>>>>>>> wonders for keeping the size of the state store in
> useful
> > > > >>>>>>>> boundaries...
> > > > >>>>>>>>>>>> But
> > > > >>>>>>>>>>>>> Im not 100% clear on how it works.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> What is implied by the '.until()' clause? What
> determines
> > > > when
> > > > >> to
> > > > >>>>>>>> stop
> > > > >>>>>>>>>>>>> receiving further data - is it clock time (since the
> > window
> > > > was
> > > > >>>>>>>>>> created)?
> > > > >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as
> this
> > > may
> > > > >>>>>> bounce
> > > > >>>>>>>>>> all
> > > > >>>>>>>>>>>>> over the place. For non-overlapping windows a given
> > record
> > > > can
> > > > >>>> only
> > > > >>>>>>>>>> fall
> > > > >>>>>>>>>>>>> into a single aggregation period - so when would a
> value
> > > get
> > > > >>>>>>>> discarded?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(...,
> TimeWindows.of(60 *
> > > > >>>>>>>>>>>> 1000L).until(10 *
> > > > >>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Re: How does 'TimeWindows.of().until()' work?

Posted by Sachin Mittal <sj...@gmail.com>.
I believe you are talking about this method.
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final
Initializer<T> initializer,
                                                                  final
Aggregator<K, V, T> aggregator,
                                                                  final
Windows<W> windows,
                                                                  final
StateStoreSupplier<WindowStore> storeSupplier)

Will this api be part of next release?

I can go about using this, however if in StateStoreSupplier we add some api
to update the logConfig, then we can pass all the topic level props as part
of streams config directly.

Thanks
Sachin



On Mon, Dec 19, 2016 at 5:32 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Sachin,
>
> I think we have a way of doing what you want already. If you create a
> custom state store you can call the enableLogging method and pass in any
> configuration parameters you want: For example:
>
> final StateStoreSupplier supplier = Stores.create("store")
>         .withKeys(Serdes.String())
>         .withValues(Serdes.String())
>         .persistent()
>         .enableLogging(Collections.singletonMap("retention.ms", "1000"))
>         .build();
>
> You can then use the overloaded methods in the DSL to pass in the
> StateStoreSupplier to your aggregates (trunk only)
>
>
> On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sj...@gmail.com> wrote:
>
> > Hi,
> > I am working towards adding topic configs as part of streams config.
> > However I have run into an issue:
> > Code flow is like this
> >
> > KStreamBuilder builder = new KStreamBuilder();
> > builder.stream(...)
> > ...
> > KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> > streams.start();
> >
> > So we can see we build the topology before building the streams.
> > While building topology it assigns state store.
> > That time no topic config props are available.
> >
> > So it creates the supplier with empty topic config.
> >
> > Further StateStoreSupplier has method just to get the config and not to
> > update it.
> > Map<String, Object> logConfig()
> >
> > One way to implement this is change this interface to be able to update
> the
> > log config props too.
> > And we the props are available to streams we update the topology
> builder's
> > state stores too with updated config.
> >
> > Other way is to change the KStreamBuilder and make it pass the topic
> > config.
> > However in second approach we would be splitting the streams config into
> > two parts.
> >
> > Let me know how should one proceed with this.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> > > I agree. We got already multiple request to add an API for specifying
> > > topic parameters for internal topic... I am pretty sure we will add it
> > > if time permits -- feel free to contribute this new feature!
> > >
> > > About chancing the value of until: that does not work, as the changelog
> > > topic configuration would not be updated.
> > >
> > >
> > > -Matthias
> > >
> > > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > > Hi,
> > > > I suggest to include topic config as well as part of streams config
> > > > properties like we do for producer and consumer configs.
> > > > The topic config supplied would be used for creating internal
> changelog
> > > > topics along with certain additional configs which are applied by
> > > default.
> > > >
> > > > This way we don't have to ever create internal topics manually.
> > > >
> > > > I had one doubt regarding until.
> > > > Say I specify one value and run my streams app.
> > > > Now I stop the app, specify different value and re start the app.
> > > >
> > > > Which value for retain would the old (pre existing) windows use.
> Would
> > it
> > > > be the older value or the new value?
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > > wrote:
> > > >
> > > >> Understood. Makes sense.
> > > >>
> > > >> For this, you should apply Streams configs manually when creating
> > those
> > > >> topics. For retention parameter, use the value you specify in
> > > >> corresponding .until() method for it.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > > >>> I was referring to internal change log topic. I had to create them
> > > >> manually
> > > >>> because in some case the message size of these topic were greater
> > than
> > > >> the
> > > >>> default ones used by kafka streams.
> > > >>>
> > > >>> I think someone in this group recommended to create these topic
> > > >> manually. I
> > > >>> understand that it is better to have internal topics created by
> > streams
> > > >> app
> > > >>> and I will take a second look at these and see if that can be done.
> > > >>>
> > > >>> I just wanted to make sure what all configs are applied to internal
> > > >> topics
> > > >>> in order to decide to avoid them creating manually.
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>>
> > > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> > > matthias@confluent.io
> > > >>>
> > > >>> wrote:
> > > >>>
> > > >>>> I am wondering about "I create internal topic manually" -- which
> > > topics
> > > >>>> do you refer in detail?
> > > >>>>
> > > >>>> Kafka Streams create all kind of internal topics with
> auto-generated
> > > >>>> names. So it would be quite tricky to create all of them manually
> > > >>>> (especially because you need to know those name in advance).
> > > >>>>
> > > >>>> IRRC, if a topic does exist, Kafka Streams does no change it's
> > > >>>> configuration. Only if Kafka Streams does create a topic, it will
> > > >>>> specify certain config parameters on topic create step.
> > > >>>>
> > > >>>>
> > > >>>> -Matthias
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > > >>>>> Hi,
> > > >>>>> Thanks for the explanation. This illustration makes it super easy
> > to
> > > >>>>> understand how until works. Perhaps we can update the wiki with
> > this
> > > >>>>> illustration.
> > > >>>>> It is basically the retention time for a past window.
> > > >>>>> I used to think until creates all the future windows for that
> > period
> > > >> and
> > > >>>>> when time passes that it used to delete all the past windows.
> > However
> > > >>>>> actually until retains a window for specified time. This makes so
> > > much
> > > >>>> more
> > > >>>>> sense.
> > > >>>>>
> > > >>>>> I just had one pending query regarding:
> > > >>>>>
> > > >>>>>> windowstore.changelog.additional.retention.ms
> > > >>>>>
> > > >>>>> How does this relate to rentention.ms param of topic config?
> > > >>>>> I create internal topic manually using say rentention.ms
> =3600000.
> > > >>>>> In next release (post kafka_2.10-0.10.0.1) since we support
> delete
> > of
> > > >>>>> internal changelog topic as well and I want it to be retained for
> > say
> > > >>>> just
> > > >>>>> 1 hour.
> > > >>>>> So how does that above parameter interfere with this topic level
> > > >> setting.
> > > >>>>> Or now I just need to set above config as 3600000 and not add
> > > >>>>> rentention.ms=3600000
> > > >>>>> while creating internal topic.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>> Sachin
> > > >>>>>
> > > >>>>>
> > > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> > > >> matthias@confluent.io
> > > >>>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> First, windows are only created if there is actual data for a
> > > window.
> > > >> So
> > > >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are
> > > record
> > > >>>>>> falling into each window (btw: window start-time is inclusive
> > while
> > > >>>>>> window end time is exclusive). If you have only 2 record with
> lets
> > > say
> > > >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each
> > > window
> > > >> is
> > > >>>>>> physically created each time the first record for it is
> processed.
> > > >>>>>>
> > > >>>>>> If you have above 4 windows and a record with ts=101 arrives, a
> > new
> > > >>>>>> window [101,151) will be created. Window [0,50) will not be
> > deleted
> > > >> yet,
> > > >>>>>> because retention is 100 and thus Streams guarantees that all
> > record
> > > >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and
> those
> > > >>>>>> records would fall into window [0,50).
> > > >>>>>>
> > > >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS =
> 150,
> > > but
> > > >>>>>> not before that.
> > > >>>>>>
> > > >>>>>> -Matthias
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > > >>>>>>> Hi,
> > > >>>>>>> So is until for future or past?
> > > >>>>>>> Say I get first record at t = 0 and until is 100 and my window
> > size
> > > >> is
> > > >>>> 50
> > > >>>>>>> advance by 25.
> > > >>>>>>> I understand it will create windows (0, 50), (25, 75), (50,
> 100)
> > > >>>>>>> Now at t = 101 it will drop
> > > >>>>>>> (0, 50), (25, 75), (50, 100) and create
> > > >>>>>>> (101, 150), (125, 175), (150, 200)
> > > >>>>>>>
> > > >>>>>>> Please confirm if this understanding us correct. It is not
> clear
> > > how
> > > >> it
> > > >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so
> > on?
> > > >>>>>>>
> > > >>>>>>> What case is not clear again is that at say t = 102 I get some
> > > >> message
> > > >>>>>> with
> > > >>>>>>> timestamp 99. What happens then?
> > > >>>>>>> Will the result added to previous aggregation of (50, 100) or
> > (75,
> > > >>>> 125),
> > > >>>>>>> like it should.
> > > >>>>>>>
> > > >>>>>>> Or it will recreate the old window (50, 100) and aggregate the
> > > value
> > > >>>>>> there
> > > >>>>>>> and then drop it. This would result is wrong aggregated value,
> as
> > > it
> > > >>>> does
> > > >>>>>>> not consider the previous aggregated values.
> > > >>>>>>>
> > > >>>>>>> So this is the pressing case I am not able to understand.
> Maybe I
> > > am
> > > >>>>>> wrong
> > > >>>>>>> at some basic understanding.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Next for
> > > >>>>>>> The parameter
> > > >>>>>>>> windowstore.changelog.additional.retention.ms
> > > >>>>>>>
> > > >>>>>>> How does this relate to rentention.ms param of topic config?
> > > >>>>>>> I create internal topic manually using say rentention.ms
> > =3600000.
> > > >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support
> > delete
> > > of
> > > >>>>>>> internal changelog topic as well and I want it to be retained
> for
> > > say
> > > >>>>>> just
> > > >>>>>>> 1 hour.
> > > >>>>>>> So how does that above parameter interfere with this topic
> level
> > > >>>> setting.
> > > >>>>>>> Or now I just need to set above config as 3600000 and not add
> > > >>>>>>> rentention.ms=3600000
> > > >>>>>>> while creating internal topic.
> > > >>>>>>> This is just another doubt remaining here.
> > > >>>>>>>
> > > >>>>>>> Thanks
> > > >>>>>>> Sachin
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> > > >>>> matthias@confluent.io>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Sachin,
> > > >>>>>>>>
> > > >>>>>>>> There is no reason to have an .until() AND a .retain() -- just
> > > >>>> increase
> > > >>>>>>>> the value of .until()
> > > >>>>>>>>
> > > >>>>>>>> If you have a window of let's say 1h size and you set .until()
> > > also
> > > >> to
> > > >>>>>>>> 1h -- you can obviously not process any late arriving data. If
> > you
> > > >> set
> > > >>>>>>>> until() to 2h is this example, you can process data that is up
> > to
> > > 1h
> > > >>>>>>>> delayed.
> > > >>>>>>>>
> > > >>>>>>>> So basically, the retention should always be larger than you
> > > window
> > > >>>>>> size.
> > > >>>>>>>>
> > > >>>>>>>> The parameter
> > > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > > >>>>>>>>
> > > >>>>>>>> is applies to changelog topics that backup window state
> stores.
> > > >> Those
> > > >>>>>>>> changelog topics are compacted. However, the used key does
> > encode
> > > an
> > > >>>>>>>> window ID and thus older data can never be cleaned up by
> > > compaction.
> > > >>>>>>>> Therefore, an additional retention time is applied to those
> > > topics,
> > > >>>> too.
> > > >>>>>>>> Thus, if an old window is not updated for this amount of time,
> > it
> > > >> will
> > > >>>>>>>> get deleted eventually preventing this topic to grown
> > infinitely.
> > > >>>>>>>>
> > > >>>>>>>> The value will be determined by until(), i.e., whatever you
> > > specify
> > > >> in
> > > >>>>>>>> .until() will be used to set this parameter.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> -Matthias
> > > >>>>>>>>
> > > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > > >>>>>>>>> Hi,
> > > >>>>>>>>> We are facing the exact problem as described by Matthias
> above.
> > > >>>>>>>>> We are keeping default until which is 1 day.
> > > >>>>>>>>>
> > > >>>>>>>>> Our record's times tamp extractor has a field which increases
> > > with
> > > >>>>>> time.
> > > >>>>>>>>> However for short time we cannot guarantee the time stamp is
> > > always
> > > >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get
> > records
> > > >>>> which
> > > >>>>>>>> are
> > > >>>>>>>>> beyond that windows retention period.
> > > >>>>>>>>>
> > > >>>>>>>>> Then it happens like it is mentioned above and our
> aggregation
> > > >> fails.
> > > >>>>>>>>>
> > > >>>>>>>>> So just to sum up when we get record
> > > >>>>>>>>> 24h + 1 sec (it deletes older window and since the new record
> > > >> belongs
> > > >>>>>> to
> > > >>>>>>>>> the new window its gets created)
> > > >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older
> window
> > > is
> > > >>>>>>>> dropped
> > > >>>>>>>>> it does not get aggregated in that bucket.
> > > >>>>>>>>>
> > > >>>>>>>>> I suggest we have another setting next to until call retain
> > which
> > > >>>>>> retains
> > > >>>>>>>>> the older windows into next window.
> > > >>>>>>>>>
> > > >>>>>>>>> I think at stream window boundary level it should use a
> concept
> > > of
> > > >>>>>>>> sliding
> > > >>>>>>>>> window. So we can define window like
> > > >>>>>>>>>
> > > >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> > > >>>>>>>> 1000l).untill(7
> > > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> > > >>>>>>>>>
> > > >>>>>>>>> So after 7 days it retains the data covered by windows in
> last
> > 15
> > > >>>>>> minutes
> > > >>>>>>>>> which rolls over the data in them to next window. This way
> > > streams
> > > >>>> work
> > > >>>>>>>>> continuously.
> > > >>>>>>>>>
> > > >>>>>>>>> Please let us know your thoughts on this.
> > > >>>>>>>>>
> > > >>>>>>>>> On another side question on this there is a setting:
> > > >>>>>>>>>
> > > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > > >>>>>>>>> I is not clear what is does. Is this the default for until?
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks
> > > >>>>>>>>> Sachin
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> > > >>>>>> matthias@confluent.io
> > > >>>>>>>>>
> > > >>>>>>>>> wrote:
> > > >>>>>>>>>
> > > >>>>>>>>>> Windows are created on demand, ie, each time a new record
> > > arrives
> > > >>>> and
> > > >>>>>>>>>> there is no window yet for it, a new window will get
> created.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Windows are accepting data until their retention time (that
> > you
> > > >> can
> > > >>>>>>>>>> configure via .until()) passed. Thus, you will have many
> > windows
> > > >>>> being
> > > >>>>>>>>>> open in parallel.
> > > >>>>>>>>>>
> > > >>>>>>>>>> If you read older data, they will just be put into the
> > > >> corresponding
> > > >>>>>>>>>> windows (as long as window retention time did not pass). If
> a
> > > >> window
> > > >>>>>> was
> > > >>>>>>>>>> discarded already, a new window with this single (later
> > > arriving)
> > > >>>>>> record
> > > >>>>>>>>>> will get created, the computation will be triggered, you
> get a
> > > >>>> result,
> > > >>>>>>>>>> and afterwards the window is deleted again (as it's
> retention
> > > time
> > > >>>>>>>>>> passed already).
> > > >>>>>>>>>>
> > > >>>>>>>>>> The retention time is driven by "stream-time", in internal
> > > tracked
> > > >>>>>> time
> > > >>>>>>>>>> that only progressed in forward direction. It gets it value
> > from
> > > >> the
> > > >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per
> default
> > > it
> > > >>>> will
> > > >>>>>>>>>> be event-time.
> > > >>>>>>>>>>
> > > >>>>>>>>>> -Matthias
> > > >>>>>>>>>>
> > > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> > > >>>>>>>>>>> I've read this and still have more questions than answers.
> If
> > > my
> > > >>>> data
> > > >>>>>>>>>> skips
> > > >>>>>>>>>>> about (timewise) what determines when a given window will
> > > start /
> > > >>>>>> stop
> > > >>>>>>>>>>> accepting new data? What if Im reading data from some time
> > ago?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> > > >>>>>>>> matthias@confluent.io>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Please have a look here:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> > > >>>>>>>>>>>> guide.html#windowing-a-stream
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> If you have further question, just follow up :)
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> > > >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps
> > and
> > > >> it's
> > > >>>>>>>>>> working
> > > >>>>>>>>>>>>> wonders for keeping the size of the state store in useful
> > > >>>>>>>> boundaries...
> > > >>>>>>>>>>>> But
> > > >>>>>>>>>>>>> Im not 100% clear on how it works.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> What is implied by the '.until()' clause? What determines
> > > when
> > > >> to
> > > >>>>>>>> stop
> > > >>>>>>>>>>>>> receiving further data - is it clock time (since the
> window
> > > was
> > > >>>>>>>>>> created)?
> > > >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this
> > may
> > > >>>>>> bounce
> > > >>>>>>>>>> all
> > > >>>>>>>>>>>>> over the place. For non-overlapping windows a given
> record
> > > can
> > > >>>> only
> > > >>>>>>>>>> fall
> > > >>>>>>>>>>>>> into a single aggregation period - so when would a value
> > get
> > > >>>>>>>> discarded?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> > > >>>>>>>>>>>> 1000L).until(10 *
> > > >>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Re: How does 'TimeWindows.of().until()' work?

Posted by Damian Guy <da...@gmail.com>.
Hi Sachin,

I think we have a way of doing what you want already. If you create a
custom state store you can call the enableLogging method and pass in any
configuration parameters you want: For example:

final StateStoreSupplier supplier = Stores.create("store")
        .withKeys(Serdes.String())
        .withValues(Serdes.String())
        .persistent()
        .enableLogging(Collections.singletonMap("retention.ms", "1000"))
        .build();

You can then use the overloaded methods in the DSL to pass in the
StateStoreSupplier to your aggregates (trunk only)


On Mon, 19 Dec 2016 at 10:58 Sachin Mittal <sj...@gmail.com> wrote:

> Hi,
> I am working towards adding topic configs as part of streams config.
> However I have run into an issue:
> Code flow is like this
>
> KStreamBuilder builder = new KStreamBuilder();
> builder.stream(...)
> ...
> KafkaStreams streams = new KafkaStreams(builder, streamsProps);
> streams.start();
>
> So we can see we build the topology before building the streams.
> While building topology it assigns state store.
> That time no topic config props are available.
>
> So it creates the supplier with empty topic config.
>
> Further StateStoreSupplier has method just to get the config and not to
> update it.
> Map<String, Object> logConfig()
>
> One way to implement this is change this interface to be able to update the
> log config props too.
> And we the props are available to streams we update the topology builder's
> state stores too with updated config.
>
> Other way is to change the KStreamBuilder and make it pass the topic
> config.
> However in second approach we would be splitting the streams config into
> two parts.
>
> Let me know how should one proceed with this.
>
> Thanks
> Sachin
>
>
>
> On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > I agree. We got already multiple request to add an API for specifying
> > topic parameters for internal topic... I am pretty sure we will add it
> > if time permits -- feel free to contribute this new feature!
> >
> > About chancing the value of until: that does not work, as the changelog
> > topic configuration would not be updated.
> >
> >
> > -Matthias
> >
> > On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > > Hi,
> > > I suggest to include topic config as well as part of streams config
> > > properties like we do for producer and consumer configs.
> > > The topic config supplied would be used for creating internal changelog
> > > topics along with certain additional configs which are applied by
> > default.
> > >
> > > This way we don't have to ever create internal topics manually.
> > >
> > > I had one doubt regarding until.
> > > Say I specify one value and run my streams app.
> > > Now I stop the app, specify different value and re start the app.
> > >
> > > Which value for retain would the old (pre existing) windows use. Would
> it
> > > be the older value or the new value?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > > wrote:
> > >
> > >> Understood. Makes sense.
> > >>
> > >> For this, you should apply Streams configs manually when creating
> those
> > >> topics. For retention parameter, use the value you specify in
> > >> corresponding .until() method for it.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > >>> I was referring to internal change log topic. I had to create them
> > >> manually
> > >>> because in some case the message size of these topic were greater
> than
> > >> the
> > >>> default ones used by kafka streams.
> > >>>
> > >>> I think someone in this group recommended to create these topic
> > >> manually. I
> > >>> understand that it is better to have internal topics created by
> streams
> > >> app
> > >>> and I will take a second look at these and see if that can be done.
> > >>>
> > >>> I just wanted to make sure what all configs are applied to internal
> > >> topics
> > >>> in order to decide to avoid them creating manually.
> > >>>
> > >>> Thanks
> > >>> Sachin
> > >>>
> > >>>
> > >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> > matthias@confluent.io
> > >>>
> > >>> wrote:
> > >>>
> > >>>> I am wondering about "I create internal topic manually" -- which
> > topics
> > >>>> do you refer in detail?
> > >>>>
> > >>>> Kafka Streams create all kind of internal topics with auto-generated
> > >>>> names. So it would be quite tricky to create all of them manually
> > >>>> (especially because you need to know those name in advance).
> > >>>>
> > >>>> IRRC, if a topic does exist, Kafka Streams does no change it's
> > >>>> configuration. Only if Kafka Streams does create a topic, it will
> > >>>> specify certain config parameters on topic create step.
> > >>>>
> > >>>>
> > >>>> -Matthias
> > >>>>
> > >>>>
> > >>>>
> > >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > >>>>> Hi,
> > >>>>> Thanks for the explanation. This illustration makes it super easy
> to
> > >>>>> understand how until works. Perhaps we can update the wiki with
> this
> > >>>>> illustration.
> > >>>>> It is basically the retention time for a past window.
> > >>>>> I used to think until creates all the future windows for that
> period
> > >> and
> > >>>>> when time passes that it used to delete all the past windows.
> However
> > >>>>> actually until retains a window for specified time. This makes so
> > much
> > >>>> more
> > >>>>> sense.
> > >>>>>
> > >>>>> I just had one pending query regarding:
> > >>>>>
> > >>>>>> windowstore.changelog.additional.retention.ms
> > >>>>>
> > >>>>> How does this relate to rentention.ms param of topic config?
> > >>>>> I create internal topic manually using say rentention.ms=3600000.
> > >>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete
> of
> > >>>>> internal changelog topic as well and I want it to be retained for
> say
> > >>>> just
> > >>>>> 1 hour.
> > >>>>> So how does that above parameter interfere with this topic level
> > >> setting.
> > >>>>> Or now I just need to set above config as 3600000 and not add
> > >>>>> rentention.ms=3600000
> > >>>>> while creating internal topic.
> > >>>>>
> > >>>>> Thanks
> > >>>>> Sachin
> > >>>>>
> > >>>>>
> > >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> > >> matthias@confluent.io
> > >>>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> First, windows are only created if there is actual data for a
> > window.
> > >> So
> > >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are
> > record
> > >>>>>> falling into each window (btw: window start-time is inclusive
> while
> > >>>>>> window end time is exclusive). If you have only 2 record with lets
> > say
> > >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each
> > window
> > >> is
> > >>>>>> physically created each time the first record for it is processed.
> > >>>>>>
> > >>>>>> If you have above 4 windows and a record with ts=101 arrives, a
> new
> > >>>>>> window [101,151) will be created. Window [0,50) will not be
> deleted
> > >> yet,
> > >>>>>> because retention is 100 and thus Streams guarantees that all
> record
> > >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
> > >>>>>> records would fall into window [0,50).
> > >>>>>>
> > >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150,
> > but
> > >>>>>> not before that.
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > >>>>>>> Hi,
> > >>>>>>> So is until for future or past?
> > >>>>>>> Say I get first record at t = 0 and until is 100 and my window
> size
> > >> is
> > >>>> 50
> > >>>>>>> advance by 25.
> > >>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
> > >>>>>>> Now at t = 101 it will drop
> > >>>>>>> (0, 50), (25, 75), (50, 100) and create
> > >>>>>>> (101, 150), (125, 175), (150, 200)
> > >>>>>>>
> > >>>>>>> Please confirm if this understanding us correct. It is not clear
> > how
> > >> it
> > >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so
> on?
> > >>>>>>>
> > >>>>>>> What case is not clear again is that at say t = 102 I get some
> > >> message
> > >>>>>> with
> > >>>>>>> timestamp 99. What happens then?
> > >>>>>>> Will the result added to previous aggregation of (50, 100) or
> (75,
> > >>>> 125),
> > >>>>>>> like it should.
> > >>>>>>>
> > >>>>>>> Or it will recreate the old window (50, 100) and aggregate the
> > value
> > >>>>>> there
> > >>>>>>> and then drop it. This would result is wrong aggregated value, as
> > it
> > >>>> does
> > >>>>>>> not consider the previous aggregated values.
> > >>>>>>>
> > >>>>>>> So this is the pressing case I am not able to understand. Maybe I
> > am
> > >>>>>> wrong
> > >>>>>>> at some basic understanding.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> Next for
> > >>>>>>> The parameter
> > >>>>>>>> windowstore.changelog.additional.retention.ms
> > >>>>>>>
> > >>>>>>> How does this relate to rentention.ms param of topic config?
> > >>>>>>> I create internal topic manually using say rentention.ms
> =3600000.
> > >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support
> delete
> > of
> > >>>>>>> internal changelog topic as well and I want it to be retained for
> > say
> > >>>>>> just
> > >>>>>>> 1 hour.
> > >>>>>>> So how does that above parameter interfere with this topic level
> > >>>> setting.
> > >>>>>>> Or now I just need to set above config as 3600000 and not add
> > >>>>>>> rentention.ms=3600000
> > >>>>>>> while creating internal topic.
> > >>>>>>> This is just another doubt remaining here.
> > >>>>>>>
> > >>>>>>> Thanks
> > >>>>>>> Sachin
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> > >>>> matthias@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Sachin,
> > >>>>>>>>
> > >>>>>>>> There is no reason to have an .until() AND a .retain() -- just
> > >>>> increase
> > >>>>>>>> the value of .until()
> > >>>>>>>>
> > >>>>>>>> If you have a window of let's say 1h size and you set .until()
> > also
> > >> to
> > >>>>>>>> 1h -- you can obviously not process any late arriving data. If
> you
> > >> set
> > >>>>>>>> until() to 2h is this example, you can process data that is up
> to
> > 1h
> > >>>>>>>> delayed.
> > >>>>>>>>
> > >>>>>>>> So basically, the retention should always be larger than you
> > window
> > >>>>>> size.
> > >>>>>>>>
> > >>>>>>>> The parameter
> > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > >>>>>>>>
> > >>>>>>>> is applies to changelog topics that backup window state stores.
> > >> Those
> > >>>>>>>> changelog topics are compacted. However, the used key does
> encode
> > an
> > >>>>>>>> window ID and thus older data can never be cleaned up by
> > compaction.
> > >>>>>>>> Therefore, an additional retention time is applied to those
> > topics,
> > >>>> too.
> > >>>>>>>> Thus, if an old window is not updated for this amount of time,
> it
> > >> will
> > >>>>>>>> get deleted eventually preventing this topic to grown
> infinitely.
> > >>>>>>>>
> > >>>>>>>> The value will be determined by until(), i.e., whatever you
> > specify
> > >> in
> > >>>>>>>> .until() will be used to set this parameter.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> -Matthias
> > >>>>>>>>
> > >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > >>>>>>>>> Hi,
> > >>>>>>>>> We are facing the exact problem as described by Matthias above.
> > >>>>>>>>> We are keeping default until which is 1 day.
> > >>>>>>>>>
> > >>>>>>>>> Our record's times tamp extractor has a field which increases
> > with
> > >>>>>> time.
> > >>>>>>>>> However for short time we cannot guarantee the time stamp is
> > always
> > >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get
> records
> > >>>> which
> > >>>>>>>> are
> > >>>>>>>>> beyond that windows retention period.
> > >>>>>>>>>
> > >>>>>>>>> Then it happens like it is mentioned above and our aggregation
> > >> fails.
> > >>>>>>>>>
> > >>>>>>>>> So just to sum up when we get record
> > >>>>>>>>> 24h + 1 sec (it deletes older window and since the new record
> > >> belongs
> > >>>>>> to
> > >>>>>>>>> the new window its gets created)
> > >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window
> > is
> > >>>>>>>> dropped
> > >>>>>>>>> it does not get aggregated in that bucket.
> > >>>>>>>>>
> > >>>>>>>>> I suggest we have another setting next to until call retain
> which
> > >>>>>> retains
> > >>>>>>>>> the older windows into next window.
> > >>>>>>>>>
> > >>>>>>>>> I think at stream window boundary level it should use a concept
> > of
> > >>>>>>>> sliding
> > >>>>>>>>> window. So we can define window like
> > >>>>>>>>>
> > >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> > >>>>>>>> 1000l).untill(7
> > >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> > >>>>>>>>>
> > >>>>>>>>> So after 7 days it retains the data covered by windows in last
> 15
> > >>>>>> minutes
> > >>>>>>>>> which rolls over the data in them to next window. This way
> > streams
> > >>>> work
> > >>>>>>>>> continuously.
> > >>>>>>>>>
> > >>>>>>>>> Please let us know your thoughts on this.
> > >>>>>>>>>
> > >>>>>>>>> On another side question on this there is a setting:
> > >>>>>>>>>
> > >>>>>>>>> windowstore.changelog.additional.retention.ms
> > >>>>>>>>> I is not clear what is does. Is this the default for until?
> > >>>>>>>>>
> > >>>>>>>>> Thanks
> > >>>>>>>>> Sachin
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> > >>>>>> matthias@confluent.io
> > >>>>>>>>>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Windows are created on demand, ie, each time a new record
> > arrives
> > >>>> and
> > >>>>>>>>>> there is no window yet for it, a new window will get created.
> > >>>>>>>>>>
> > >>>>>>>>>> Windows are accepting data until their retention time (that
> you
> > >> can
> > >>>>>>>>>> configure via .until()) passed. Thus, you will have many
> windows
> > >>>> being
> > >>>>>>>>>> open in parallel.
> > >>>>>>>>>>
> > >>>>>>>>>> If you read older data, they will just be put into the
> > >> corresponding
> > >>>>>>>>>> windows (as long as window retention time did not pass). If a
> > >> window
> > >>>>>> was
> > >>>>>>>>>> discarded already, a new window with this single (later
> > arriving)
> > >>>>>> record
> > >>>>>>>>>> will get created, the computation will be triggered, you get a
> > >>>> result,
> > >>>>>>>>>> and afterwards the window is deleted again (as it's retention
> > time
> > >>>>>>>>>> passed already).
> > >>>>>>>>>>
> > >>>>>>>>>> The retention time is driven by "stream-time", in internal
> > tracked
> > >>>>>> time
> > >>>>>>>>>> that only progressed in forward direction. It gets it value
> from
> > >> the
> > >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default
> > it
> > >>>> will
> > >>>>>>>>>> be event-time.
> > >>>>>>>>>>
> > >>>>>>>>>> -Matthias
> > >>>>>>>>>>
> > >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> > >>>>>>>>>>> I've read this and still have more questions than answers. If
> > my
> > >>>> data
> > >>>>>>>>>> skips
> > >>>>>>>>>>> about (timewise) what determines when a given window will
> > start /
> > >>>>>> stop
> > >>>>>>>>>>> accepting new data? What if Im reading data from some time
> ago?
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> > >>>>>>>> matthias@confluent.io>
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Please have a look here:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> > >>>>>>>>>>>> guide.html#windowing-a-stream
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> If you have further question, just follow up :)
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> -Matthias
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> > >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps
> and
> > >> it's
> > >>>>>>>>>> working
> > >>>>>>>>>>>>> wonders for keeping the size of the state store in useful
> > >>>>>>>> boundaries...
> > >>>>>>>>>>>> But
> > >>>>>>>>>>>>> Im not 100% clear on how it works.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> What is implied by the '.until()' clause? What determines
> > when
> > >> to
> > >>>>>>>> stop
> > >>>>>>>>>>>>> receiving further data - is it clock time (since the window
> > was
> > >>>>>>>>>> created)?
> > >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this
> may
> > >>>>>> bounce
> > >>>>>>>>>> all
> > >>>>>>>>>>>>> over the place. For non-overlapping windows a given record
> > can
> > >>>> only
> > >>>>>>>>>> fall
> > >>>>>>>>>>>>> into a single aggregation period - so when would a value
> get
> > >>>>>>>> discarded?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> > >>>>>>>>>>>> 1000L).until(10 *
> > >>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Re: How does 'TimeWindows.of().until()' work?

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
I am working towards adding topic configs as part of streams config.
However I have run into an issue:
Code flow is like this

KStreamBuilder builder = new KStreamBuilder();
builder.stream(...)
...
KafkaStreams streams = new KafkaStreams(builder, streamsProps);
streams.start();

So we can see we build the topology before building the streams.
While building topology it assigns state store.
That time no topic config props are available.

So it creates the supplier with empty topic config.

Further StateStoreSupplier has method just to get the config and not to
update it.
Map<String, Object> logConfig()

One way to implement this is change this interface to be able to update the
log config props too.
And we the props are available to streams we update the topology builder's
state stores too with updated config.

Other way is to change the KStreamBuilder and make it pass the topic config.
However in second approach we would be splitting the streams config into
two parts.

Let me know how should one proceed with this.

Thanks
Sachin



On Thu, Dec 15, 2016 at 2:27 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I agree. We got already multiple request to add an API for specifying
> topic parameters for internal topic... I am pretty sure we will add it
> if time permits -- feel free to contribute this new feature!
>
> About chancing the value of until: that does not work, as the changelog
> topic configuration would not be updated.
>
>
> -Matthias
>
> On 12/14/16 8:22 PM, Sachin Mittal wrote:
> > Hi,
> > I suggest to include topic config as well as part of streams config
> > properties like we do for producer and consumer configs.
> > The topic config supplied would be used for creating internal changelog
> > topics along with certain additional configs which are applied by
> default.
> >
> > This way we don't have to ever create internal topics manually.
> >
> > I had one doubt regarding until.
> > Say I specify one value and run my streams app.
> > Now I stop the app, specify different value and re start the app.
> >
> > Which value for retain would the old (pre existing) windows use. Would it
> > be the older value or the new value?
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> >> Understood. Makes sense.
> >>
> >> For this, you should apply Streams configs manually when creating those
> >> topics. For retention parameter, use the value you specify in
> >> corresponding .until() method for it.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> >>> I was referring to internal change log topic. I had to create them
> >> manually
> >>> because in some case the message size of these topic were greater than
> >> the
> >>> default ones used by kafka streams.
> >>>
> >>> I think someone in this group recommended to create these topic
> >> manually. I
> >>> understand that it is better to have internal topics created by streams
> >> app
> >>> and I will take a second look at these and see if that can be done.
> >>>
> >>> I just wanted to make sure what all configs are applied to internal
> >> topics
> >>> in order to decide to avoid them creating manually.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> I am wondering about "I create internal topic manually" -- which
> topics
> >>>> do you refer in detail?
> >>>>
> >>>> Kafka Streams create all kind of internal topics with auto-generated
> >>>> names. So it would be quite tricky to create all of them manually
> >>>> (especially because you need to know those name in advance).
> >>>>
> >>>> IRRC, if a topic does exist, Kafka Streams does no change it's
> >>>> configuration. Only if Kafka Streams does create a topic, it will
> >>>> specify certain config parameters on topic create step.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>>
> >>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> >>>>> Hi,
> >>>>> Thanks for the explanation. This illustration makes it super easy to
> >>>>> understand how until works. Perhaps we can update the wiki with this
> >>>>> illustration.
> >>>>> It is basically the retention time for a past window.
> >>>>> I used to think until creates all the future windows for that period
> >> and
> >>>>> when time passes that it used to delete all the past windows. However
> >>>>> actually until retains a window for specified time. This makes so
> much
> >>>> more
> >>>>> sense.
> >>>>>
> >>>>> I just had one pending query regarding:
> >>>>>
> >>>>>> windowstore.changelog.additional.retention.ms
> >>>>>
> >>>>> How does this relate to rentention.ms param of topic config?
> >>>>> I create internal topic manually using say rentention.ms=3600000.
> >>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> >>>>> internal changelog topic as well and I want it to be retained for say
> >>>> just
> >>>>> 1 hour.
> >>>>> So how does that above parameter interfere with this topic level
> >> setting.
> >>>>> Or now I just need to set above config as 3600000 and not add
> >>>>> rentention.ms=3600000
> >>>>> while creating internal topic.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>>
> >>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> >> matthias@confluent.io
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> First, windows are only created if there is actual data for a
> window.
> >> So
> >>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are
> record
> >>>>>> falling into each window (btw: window start-time is inclusive while
> >>>>>> window end time is exclusive). If you have only 2 record with lets
> say
> >>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each
> window
> >> is
> >>>>>> physically created each time the first record for it is processed.
> >>>>>>
> >>>>>> If you have above 4 windows and a record with ts=101 arrives, a new
> >>>>>> window [101,151) will be created. Window [0,50) will not be deleted
> >> yet,
> >>>>>> because retention is 100 and thus Streams guarantees that all record
> >>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
> >>>>>> records would fall into window [0,50).
> >>>>>>
> >>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150,
> but
> >>>>>> not before that.
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> >>>>>>> Hi,
> >>>>>>> So is until for future or past?
> >>>>>>> Say I get first record at t = 0 and until is 100 and my window size
> >> is
> >>>> 50
> >>>>>>> advance by 25.
> >>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
> >>>>>>> Now at t = 101 it will drop
> >>>>>>> (0, 50), (25, 75), (50, 100) and create
> >>>>>>> (101, 150), (125, 175), (150, 200)
> >>>>>>>
> >>>>>>> Please confirm if this understanding us correct. It is not clear
> how
> >> it
> >>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
> >>>>>>>
> >>>>>>> What case is not clear again is that at say t = 102 I get some
> >> message
> >>>>>> with
> >>>>>>> timestamp 99. What happens then?
> >>>>>>> Will the result added to previous aggregation of (50, 100) or (75,
> >>>> 125),
> >>>>>>> like it should.
> >>>>>>>
> >>>>>>> Or it will recreate the old window (50, 100) and aggregate the
> value
> >>>>>> there
> >>>>>>> and then drop it. This would result is wrong aggregated value, as
> it
> >>>> does
> >>>>>>> not consider the previous aggregated values.
> >>>>>>>
> >>>>>>> So this is the pressing case I am not able to understand. Maybe I
> am
> >>>>>> wrong
> >>>>>>> at some basic understanding.
> >>>>>>>
> >>>>>>>
> >>>>>>> Next for
> >>>>>>> The parameter
> >>>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>>
> >>>>>>> How does this relate to rentention.ms param of topic config?
> >>>>>>> I create internal topic manually using say rentention.ms=3600000.
> >>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete
> of
> >>>>>>> internal changelog topic as well and I want it to be retained for
> say
> >>>>>> just
> >>>>>>> 1 hour.
> >>>>>>> So how does that above parameter interfere with this topic level
> >>>> setting.
> >>>>>>> Or now I just need to set above config as 3600000 and not add
> >>>>>>> rentention.ms=3600000
> >>>>>>> while creating internal topic.
> >>>>>>> This is just another doubt remaining here.
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Sachin
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> >>>> matthias@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Sachin,
> >>>>>>>>
> >>>>>>>> There is no reason to have an .until() AND a .retain() -- just
> >>>> increase
> >>>>>>>> the value of .until()
> >>>>>>>>
> >>>>>>>> If you have a window of let's say 1h size and you set .until()
> also
> >> to
> >>>>>>>> 1h -- you can obviously not process any late arriving data. If you
> >> set
> >>>>>>>> until() to 2h is this example, you can process data that is up to
> 1h
> >>>>>>>> delayed.
> >>>>>>>>
> >>>>>>>> So basically, the retention should always be larger than you
> window
> >>>>>> size.
> >>>>>>>>
> >>>>>>>> The parameter
> >>>>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>>>
> >>>>>>>> is applies to changelog topics that backup window state stores.
> >> Those
> >>>>>>>> changelog topics are compacted. However, the used key does encode
> an
> >>>>>>>> window ID and thus older data can never be cleaned up by
> compaction.
> >>>>>>>> Therefore, an additional retention time is applied to those
> topics,
> >>>> too.
> >>>>>>>> Thus, if an old window is not updated for this amount of time, it
> >> will
> >>>>>>>> get deleted eventually preventing this topic to grown infinitely.
> >>>>>>>>
> >>>>>>>> The value will be determined by until(), i.e., whatever you
> specify
> >> in
> >>>>>>>> .until() will be used to set this parameter.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>>>>>>>> Hi,
> >>>>>>>>> We are facing the exact problem as described by Matthias above.
> >>>>>>>>> We are keeping default until which is 1 day.
> >>>>>>>>>
> >>>>>>>>> Our record's times tamp extractor has a field which increases
> with
> >>>>>> time.
> >>>>>>>>> However for short time we cannot guarantee the time stamp is
> always
> >>>>>>>>> increases. So at the boundary ie after 24 hrs we can get records
> >>>> which
> >>>>>>>> are
> >>>>>>>>> beyond that windows retention period.
> >>>>>>>>>
> >>>>>>>>> Then it happens like it is mentioned above and our aggregation
> >> fails.
> >>>>>>>>>
> >>>>>>>>> So just to sum up when we get record
> >>>>>>>>> 24h + 1 sec (it deletes older window and since the new record
> >> belongs
> >>>>>> to
> >>>>>>>>> the new window its gets created)
> >>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window
> is
> >>>>>>>> dropped
> >>>>>>>>> it does not get aggregated in that bucket.
> >>>>>>>>>
> >>>>>>>>> I suggest we have another setting next to until call retain which
> >>>>>> retains
> >>>>>>>>> the older windows into next window.
> >>>>>>>>>
> >>>>>>>>> I think at stream window boundary level it should use a concept
> of
> >>>>>>>> sliding
> >>>>>>>>> window. So we can define window like
> >>>>>>>>>
> >>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >>>>>>>> 1000l).untill(7
> >>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>>>>>>>
> >>>>>>>>> So after 7 days it retains the data covered by windows in last 15
> >>>>>> minutes
> >>>>>>>>> which rolls over the data in them to next window. This way
> streams
> >>>> work
> >>>>>>>>> continuously.
> >>>>>>>>>
> >>>>>>>>> Please let us know your thoughts on this.
> >>>>>>>>>
> >>>>>>>>> On another side question on this there is a setting:
> >>>>>>>>>
> >>>>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>>>> I is not clear what is does. Is this the default for until?
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Sachin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> >>>>>> matthias@confluent.io
> >>>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Windows are created on demand, ie, each time a new record
> arrives
> >>>> and
> >>>>>>>>>> there is no window yet for it, a new window will get created.
> >>>>>>>>>>
> >>>>>>>>>> Windows are accepting data until their retention time (that you
> >> can
> >>>>>>>>>> configure via .until()) passed. Thus, you will have many windows
> >>>> being
> >>>>>>>>>> open in parallel.
> >>>>>>>>>>
> >>>>>>>>>> If you read older data, they will just be put into the
> >> corresponding
> >>>>>>>>>> windows (as long as window retention time did not pass). If a
> >> window
> >>>>>> was
> >>>>>>>>>> discarded already, a new window with this single (later
> arriving)
> >>>>>> record
> >>>>>>>>>> will get created, the computation will be triggered, you get a
> >>>> result,
> >>>>>>>>>> and afterwards the window is deleted again (as it's retention
> time
> >>>>>>>>>> passed already).
> >>>>>>>>>>
> >>>>>>>>>> The retention time is driven by "stream-time", in internal
> tracked
> >>>>>> time
> >>>>>>>>>> that only progressed in forward direction. It gets it value from
> >> the
> >>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default
> it
> >>>> will
> >>>>>>>>>> be event-time.
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>>>>>>>> I've read this and still have more questions than answers. If
> my
> >>>> data
> >>>>>>>>>> skips
> >>>>>>>>>>> about (timewise) what determines when a given window will
> start /
> >>>>>> stop
> >>>>>>>>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>>>>>>>
> >>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >>>>>>>> matthias@confluent.io>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Please have a look here:
> >>>>>>>>>>>>
> >>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>>>>>>>> guide.html#windowing-a-stream
> >>>>>>>>>>>>
> >>>>>>>>>>>> If you have further question, just follow up :)
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and
> >> it's
> >>>>>>>>>> working
> >>>>>>>>>>>>> wonders for keeping the size of the state store in useful
> >>>>>>>> boundaries...
> >>>>>>>>>>>> But
> >>>>>>>>>>>>> Im not 100% clear on how it works.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> What is implied by the '.until()' clause? What determines
> when
> >> to
> >>>>>>>> stop
> >>>>>>>>>>>>> receiving further data - is it clock time (since the window
> was
> >>>>>>>>>> created)?
> >>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this may
> >>>>>> bounce
> >>>>>>>>>> all
> >>>>>>>>>>>>> over the place. For non-overlapping windows a given record
> can
> >>>> only
> >>>>>>>>>> fall
> >>>>>>>>>>>>> into a single aggregation period - so when would a value get
> >>>>>>>> discarded?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>>>>>>>>>> 1000L).until(10 *
> >>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I agree. We got already multiple request to add an API for specifying
topic parameters for internal topic... I am pretty sure we will add it
if time permits -- feel free to contribute this new feature!

About chancing the value of until: that does not work, as the changelog
topic configuration would not be updated.


-Matthias

On 12/14/16 8:22 PM, Sachin Mittal wrote:
> Hi,
> I suggest to include topic config as well as part of streams config
> properties like we do for producer and consumer configs.
> The topic config supplied would be used for creating internal changelog
> topics along with certain additional configs which are applied by default.
> 
> This way we don't have to ever create internal topics manually.
> 
> I had one doubt regarding until.
> Say I specify one value and run my streams app.
> Now I stop the app, specify different value and re start the app.
> 
> Which value for retain would the old (pre existing) windows use. Would it
> be the older value or the new value?
> 
> Thanks
> Sachin
> 
> 
> 
> On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Understood. Makes sense.
>>
>> For this, you should apply Streams configs manually when creating those
>> topics. For retention parameter, use the value you specify in
>> corresponding .until() method for it.
>>
>>
>> -Matthias
>>
>>
>> On 12/14/16 10:08 AM, Sachin Mittal wrote:
>>> I was referring to internal change log topic. I had to create them
>> manually
>>> because in some case the message size of these topic were greater than
>> the
>>> default ones used by kafka streams.
>>>
>>> I think someone in this group recommended to create these topic
>> manually. I
>>> understand that it is better to have internal topics created by streams
>> app
>>> and I will take a second look at these and see if that can be done.
>>>
>>> I just wanted to make sure what all configs are applied to internal
>> topics
>>> in order to decide to avoid them creating manually.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <matthias@confluent.io
>>>
>>> wrote:
>>>
>>>> I am wondering about "I create internal topic manually" -- which topics
>>>> do you refer in detail?
>>>>
>>>> Kafka Streams create all kind of internal topics with auto-generated
>>>> names. So it would be quite tricky to create all of them manually
>>>> (especially because you need to know those name in advance).
>>>>
>>>> IRRC, if a topic does exist, Kafka Streams does no change it's
>>>> configuration. Only if Kafka Streams does create a topic, it will
>>>> specify certain config parameters on topic create step.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> Thanks for the explanation. This illustration makes it super easy to
>>>>> understand how until works. Perhaps we can update the wiki with this
>>>>> illustration.
>>>>> It is basically the retention time for a past window.
>>>>> I used to think until creates all the future windows for that period
>> and
>>>>> when time passes that it used to delete all the past windows. However
>>>>> actually until retains a window for specified time. This makes so much
>>>> more
>>>>> sense.
>>>>>
>>>>> I just had one pending query regarding:
>>>>>
>>>>>> windowstore.changelog.additional.retention.ms
>>>>>
>>>>> How does this relate to rentention.ms param of topic config?
>>>>> I create internal topic manually using say rentention.ms=3600000.
>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>>>> internal changelog topic as well and I want it to be retained for say
>>>> just
>>>>> 1 hour.
>>>>> So how does that above parameter interfere with this topic level
>> setting.
>>>>> Or now I just need to set above config as 3600000 and not add
>>>>> rentention.ms=3600000
>>>>> while creating internal topic.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
>> matthias@confluent.io
>>>>>
>>>>> wrote:
>>>>>
>>>>>> First, windows are only created if there is actual data for a window.
>> So
>>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>>>>>> falling into each window (btw: window start-time is inclusive while
>>>>>> window end time is exclusive). If you have only 2 record with lets say
>>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each window
>> is
>>>>>> physically created each time the first record for it is processed.
>>>>>>
>>>>>> If you have above 4 windows and a record with ts=101 arrives, a new
>>>>>> window [101,151) will be created. Window [0,50) will not be deleted
>> yet,
>>>>>> because retention is 100 and thus Streams guarantees that all record
>>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>>>>>> records would fall into window [0,50).
>>>>>>
>>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>>>>>> not before that.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>>>>>> Hi,
>>>>>>> So is until for future or past?
>>>>>>> Say I get first record at t = 0 and until is 100 and my window size
>> is
>>>> 50
>>>>>>> advance by 25.
>>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>>>>>> Now at t = 101 it will drop
>>>>>>> (0, 50), (25, 75), (50, 100) and create
>>>>>>> (101, 150), (125, 175), (150, 200)
>>>>>>>
>>>>>>> Please confirm if this understanding us correct. It is not clear how
>> it
>>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>>>>>
>>>>>>> What case is not clear again is that at say t = 102 I get some
>> message
>>>>>> with
>>>>>>> timestamp 99. What happens then?
>>>>>>> Will the result added to previous aggregation of (50, 100) or (75,
>>>> 125),
>>>>>>> like it should.
>>>>>>>
>>>>>>> Or it will recreate the old window (50, 100) and aggregate the value
>>>>>> there
>>>>>>> and then drop it. This would result is wrong aggregated value, as it
>>>> does
>>>>>>> not consider the previous aggregated values.
>>>>>>>
>>>>>>> So this is the pressing case I am not able to understand. Maybe I am
>>>>>> wrong
>>>>>>> at some basic understanding.
>>>>>>>
>>>>>>>
>>>>>>> Next for
>>>>>>> The parameter
>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>
>>>>>>> How does this relate to rentention.ms param of topic config?
>>>>>>> I create internal topic manually using say rentention.ms=3600000.
>>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>>>>>> internal changelog topic as well and I want it to be retained for say
>>>>>> just
>>>>>>> 1 hour.
>>>>>>> So how does that above parameter interfere with this topic level
>>>> setting.
>>>>>>> Or now I just need to set above config as 3600000 and not add
>>>>>>> rentention.ms=3600000
>>>>>>> while creating internal topic.
>>>>>>> This is just another doubt remaining here.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sachin,
>>>>>>>>
>>>>>>>> There is no reason to have an .until() AND a .retain() -- just
>>>> increase
>>>>>>>> the value of .until()
>>>>>>>>
>>>>>>>> If you have a window of let's say 1h size and you set .until() also
>> to
>>>>>>>> 1h -- you can obviously not process any late arriving data. If you
>> set
>>>>>>>> until() to 2h is this example, you can process data that is up to 1h
>>>>>>>> delayed.
>>>>>>>>
>>>>>>>> So basically, the retention should always be larger than you window
>>>>>> size.
>>>>>>>>
>>>>>>>> The parameter
>>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>>
>>>>>>>> is applies to changelog topics that backup window state stores.
>> Those
>>>>>>>> changelog topics are compacted. However, the used key does encode an
>>>>>>>> window ID and thus older data can never be cleaned up by compaction.
>>>>>>>> Therefore, an additional retention time is applied to those topics,
>>>> too.
>>>>>>>> Thus, if an old window is not updated for this amount of time, it
>> will
>>>>>>>> get deleted eventually preventing this topic to grown infinitely.
>>>>>>>>
>>>>>>>> The value will be determined by until(), i.e., whatever you specify
>> in
>>>>>>>> .until() will be used to set this parameter.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>>>>>>>> Hi,
>>>>>>>>> We are facing the exact problem as described by Matthias above.
>>>>>>>>> We are keeping default until which is 1 day.
>>>>>>>>>
>>>>>>>>> Our record's times tamp extractor has a field which increases with
>>>>>> time.
>>>>>>>>> However for short time we cannot guarantee the time stamp is always
>>>>>>>>> increases. So at the boundary ie after 24 hrs we can get records
>>>> which
>>>>>>>> are
>>>>>>>>> beyond that windows retention period.
>>>>>>>>>
>>>>>>>>> Then it happens like it is mentioned above and our aggregation
>> fails.
>>>>>>>>>
>>>>>>>>> So just to sum up when we get record
>>>>>>>>> 24h + 1 sec (it deletes older window and since the new record
>> belongs
>>>>>> to
>>>>>>>>> the new window its gets created)
>>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window is
>>>>>>>> dropped
>>>>>>>>> it does not get aggregated in that bucket.
>>>>>>>>>
>>>>>>>>> I suggest we have another setting next to until call retain which
>>>>>> retains
>>>>>>>>> the older windows into next window.
>>>>>>>>>
>>>>>>>>> I think at stream window boundary level it should use a concept of
>>>>>>>> sliding
>>>>>>>>> window. So we can define window like
>>>>>>>>>
>>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>>>>>>> 1000l).untill(7
>>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>>>>>>>
>>>>>>>>> So after 7 days it retains the data covered by windows in last 15
>>>>>> minutes
>>>>>>>>> which rolls over the data in them to next window. This way streams
>>>> work
>>>>>>>>> continuously.
>>>>>>>>>
>>>>>>>>> Please let us know your thoughts on this.
>>>>>>>>>
>>>>>>>>> On another side question on this there is a setting:
>>>>>>>>>
>>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>>> I is not clear what is does. Is this the default for until?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Sachin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
>>>>>> matthias@confluent.io
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Windows are created on demand, ie, each time a new record arrives
>>>> and
>>>>>>>>>> there is no window yet for it, a new window will get created.
>>>>>>>>>>
>>>>>>>>>> Windows are accepting data until their retention time (that you
>> can
>>>>>>>>>> configure via .until()) passed. Thus, you will have many windows
>>>> being
>>>>>>>>>> open in parallel.
>>>>>>>>>>
>>>>>>>>>> If you read older data, they will just be put into the
>> corresponding
>>>>>>>>>> windows (as long as window retention time did not pass). If a
>> window
>>>>>> was
>>>>>>>>>> discarded already, a new window with this single (later arriving)
>>>>>> record
>>>>>>>>>> will get created, the computation will be triggered, you get a
>>>> result,
>>>>>>>>>> and afterwards the window is deleted again (as it's retention time
>>>>>>>>>> passed already).
>>>>>>>>>>
>>>>>>>>>> The retention time is driven by "stream-time", in internal tracked
>>>>>> time
>>>>>>>>>> that only progressed in forward direction. It gets it value from
>> the
>>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default it
>>>> will
>>>>>>>>>> be event-time.
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>>>>>>>> I've read this and still have more questions than answers. If my
>>>> data
>>>>>>>>>> skips
>>>>>>>>>>> about (timewise) what determines when a given window will start /
>>>>>> stop
>>>>>>>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>>>>>>>> matthias@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Please have a look here:
>>>>>>>>>>>>
>>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>>>>>> guide.html#windowing-a-stream
>>>>>>>>>>>>
>>>>>>>>>>>> If you have further question, just follow up :)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and
>> it's
>>>>>>>>>> working
>>>>>>>>>>>>> wonders for keeping the size of the state store in useful
>>>>>>>> boundaries...
>>>>>>>>>>>> But
>>>>>>>>>>>>> Im not 100% clear on how it works.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What is implied by the '.until()' clause? What determines when
>> to
>>>>>>>> stop
>>>>>>>>>>>>> receiving further data - is it clock time (since the window was
>>>>>>>>>> created)?
>>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this may
>>>>>> bounce
>>>>>>>>>> all
>>>>>>>>>>>>> over the place. For non-overlapping windows a given record can
>>>> only
>>>>>>>>>> fall
>>>>>>>>>>>>> into a single aggregation period - so when would a value get
>>>>>>>> discarded?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>>>>>>>> 1000L).until(10 *
>>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I agree. We got already multiple request to add an API for specifying
topic parameters for internal topic... I am pretty sure we will add it
if time permits -- feel free to contribute this new feature!

About chancing the value of until: that does not work, as the changelog
topic configuration would not be updated.


-Matthias

On 12/14/16 8:22 PM, Sachin Mittal wrote:
> Hi,
> I suggest to include topic config as well as part of streams config
> properties like we do for producer and consumer configs.
> The topic config supplied would be used for creating internal changelog
> topics along with certain additional configs which are applied by default.
> 
> This way we don't have to ever create internal topics manually.
> 
> I had one doubt regarding until.
> Say I specify one value and run my streams app.
> Now I stop the app, specify different value and re start the app.
> 
> Which value for retain would the old (pre existing) windows use. Would it
> be the older value or the new value?
> 
> Thanks
> Sachin
> 
> 
> 
> On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Understood. Makes sense.
>>
>> For this, you should apply Streams configs manually when creating those
>> topics. For retention parameter, use the value you specify in
>> corresponding .until() method for it.
>>
>>
>> -Matthias
>>
>>
>> On 12/14/16 10:08 AM, Sachin Mittal wrote:
>>> I was referring to internal change log topic. I had to create them
>> manually
>>> because in some case the message size of these topic were greater than
>> the
>>> default ones used by kafka streams.
>>>
>>> I think someone in this group recommended to create these topic
>> manually. I
>>> understand that it is better to have internal topics created by streams
>> app
>>> and I will take a second look at these and see if that can be done.
>>>
>>> I just wanted to make sure what all configs are applied to internal
>> topics
>>> in order to decide to avoid them creating manually.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <matthias@confluent.io
>>>
>>> wrote:
>>>
>>>> I am wondering about "I create internal topic manually" -- which topics
>>>> do you refer in detail?
>>>>
>>>> Kafka Streams create all kind of internal topics with auto-generated
>>>> names. So it would be quite tricky to create all of them manually
>>>> (especially because you need to know those name in advance).
>>>>
>>>> IRRC, if a topic does exist, Kafka Streams does no change it's
>>>> configuration. Only if Kafka Streams does create a topic, it will
>>>> specify certain config parameters on topic create step.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> Thanks for the explanation. This illustration makes it super easy to
>>>>> understand how until works. Perhaps we can update the wiki with this
>>>>> illustration.
>>>>> It is basically the retention time for a past window.
>>>>> I used to think until creates all the future windows for that period
>> and
>>>>> when time passes that it used to delete all the past windows. However
>>>>> actually until retains a window for specified time. This makes so much
>>>> more
>>>>> sense.
>>>>>
>>>>> I just had one pending query regarding:
>>>>>
>>>>>> windowstore.changelog.additional.retention.ms
>>>>>
>>>>> How does this relate to rentention.ms param of topic config?
>>>>> I create internal topic manually using say rentention.ms=3600000.
>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>>>> internal changelog topic as well and I want it to be retained for say
>>>> just
>>>>> 1 hour.
>>>>> So how does that above parameter interfere with this topic level
>> setting.
>>>>> Or now I just need to set above config as 3600000 and not add
>>>>> rentention.ms=3600000
>>>>> while creating internal topic.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
>> matthias@confluent.io
>>>>>
>>>>> wrote:
>>>>>
>>>>>> First, windows are only created if there is actual data for a window.
>> So
>>>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>>>>>> falling into each window (btw: window start-time is inclusive while
>>>>>> window end time is exclusive). If you have only 2 record with lets say
>>>>>> ts=20 and ts=90 you will not have an open window [25,75). Each window
>> is
>>>>>> physically created each time the first record for it is processed.
>>>>>>
>>>>>> If you have above 4 windows and a record with ts=101 arrives, a new
>>>>>> window [101,151) will be created. Window [0,50) will not be deleted
>> yet,
>>>>>> because retention is 100 and thus Streams guarantees that all record
>>>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>>>>>> records would fall into window [0,50).
>>>>>>
>>>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>>>>>> not before that.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>>>>>> Hi,
>>>>>>> So is until for future or past?
>>>>>>> Say I get first record at t = 0 and until is 100 and my window size
>> is
>>>> 50
>>>>>>> advance by 25.
>>>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>>>>>> Now at t = 101 it will drop
>>>>>>> (0, 50), (25, 75), (50, 100) and create
>>>>>>> (101, 150), (125, 175), (150, 200)
>>>>>>>
>>>>>>> Please confirm if this understanding us correct. It is not clear how
>> it
>>>>>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>>>>>
>>>>>>> What case is not clear again is that at say t = 102 I get some
>> message
>>>>>> with
>>>>>>> timestamp 99. What happens then?
>>>>>>> Will the result added to previous aggregation of (50, 100) or (75,
>>>> 125),
>>>>>>> like it should.
>>>>>>>
>>>>>>> Or it will recreate the old window (50, 100) and aggregate the value
>>>>>> there
>>>>>>> and then drop it. This would result is wrong aggregated value, as it
>>>> does
>>>>>>> not consider the previous aggregated values.
>>>>>>>
>>>>>>> So this is the pressing case I am not able to understand. Maybe I am
>>>>>> wrong
>>>>>>> at some basic understanding.
>>>>>>>
>>>>>>>
>>>>>>> Next for
>>>>>>> The parameter
>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>
>>>>>>> How does this relate to rentention.ms param of topic config?
>>>>>>> I create internal topic manually using say rentention.ms=3600000.
>>>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>>>>>> internal changelog topic as well and I want it to be retained for say
>>>>>> just
>>>>>>> 1 hour.
>>>>>>> So how does that above parameter interfere with this topic level
>>>> setting.
>>>>>>> Or now I just need to set above config as 3600000 and not add
>>>>>>> rentention.ms=3600000
>>>>>>> while creating internal topic.
>>>>>>> This is just another doubt remaining here.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sachin,
>>>>>>>>
>>>>>>>> There is no reason to have an .until() AND a .retain() -- just
>>>> increase
>>>>>>>> the value of .until()
>>>>>>>>
>>>>>>>> If you have a window of let's say 1h size and you set .until() also
>> to
>>>>>>>> 1h -- you can obviously not process any late arriving data. If you
>> set
>>>>>>>> until() to 2h is this example, you can process data that is up to 1h
>>>>>>>> delayed.
>>>>>>>>
>>>>>>>> So basically, the retention should always be larger than you window
>>>>>> size.
>>>>>>>>
>>>>>>>> The parameter
>>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>>
>>>>>>>> is applies to changelog topics that backup window state stores.
>> Those
>>>>>>>> changelog topics are compacted. However, the used key does encode an
>>>>>>>> window ID and thus older data can never be cleaned up by compaction.
>>>>>>>> Therefore, an additional retention time is applied to those topics,
>>>> too.
>>>>>>>> Thus, if an old window is not updated for this amount of time, it
>> will
>>>>>>>> get deleted eventually preventing this topic to grown infinitely.
>>>>>>>>
>>>>>>>> The value will be determined by until(), i.e., whatever you specify
>> in
>>>>>>>> .until() will be used to set this parameter.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>>>>>>>> Hi,
>>>>>>>>> We are facing the exact problem as described by Matthias above.
>>>>>>>>> We are keeping default until which is 1 day.
>>>>>>>>>
>>>>>>>>> Our record's times tamp extractor has a field which increases with
>>>>>> time.
>>>>>>>>> However for short time we cannot guarantee the time stamp is always
>>>>>>>>> increases. So at the boundary ie after 24 hrs we can get records
>>>> which
>>>>>>>> are
>>>>>>>>> beyond that windows retention period.
>>>>>>>>>
>>>>>>>>> Then it happens like it is mentioned above and our aggregation
>> fails.
>>>>>>>>>
>>>>>>>>> So just to sum up when we get record
>>>>>>>>> 24h + 1 sec (it deletes older window and since the new record
>> belongs
>>>>>> to
>>>>>>>>> the new window its gets created)
>>>>>>>>> Now when we get next record of 24 hs - 1 sec since older window is
>>>>>>>> dropped
>>>>>>>>> it does not get aggregated in that bucket.
>>>>>>>>>
>>>>>>>>> I suggest we have another setting next to until call retain which
>>>>>> retains
>>>>>>>>> the older windows into next window.
>>>>>>>>>
>>>>>>>>> I think at stream window boundary level it should use a concept of
>>>>>>>> sliding
>>>>>>>>> window. So we can define window like
>>>>>>>>>
>>>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>>>>>>> 1000l).untill(7
>>>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>>>>>>>
>>>>>>>>> So after 7 days it retains the data covered by windows in last 15
>>>>>> minutes
>>>>>>>>> which rolls over the data in them to next window. This way streams
>>>> work
>>>>>>>>> continuously.
>>>>>>>>>
>>>>>>>>> Please let us know your thoughts on this.
>>>>>>>>>
>>>>>>>>> On another side question on this there is a setting:
>>>>>>>>>
>>>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>>>> I is not clear what is does. Is this the default for until?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Sachin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
>>>>>> matthias@confluent.io
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Windows are created on demand, ie, each time a new record arrives
>>>> and
>>>>>>>>>> there is no window yet for it, a new window will get created.
>>>>>>>>>>
>>>>>>>>>> Windows are accepting data until their retention time (that you
>> can
>>>>>>>>>> configure via .until()) passed. Thus, you will have many windows
>>>> being
>>>>>>>>>> open in parallel.
>>>>>>>>>>
>>>>>>>>>> If you read older data, they will just be put into the
>> corresponding
>>>>>>>>>> windows (as long as window retention time did not pass). If a
>> window
>>>>>> was
>>>>>>>>>> discarded already, a new window with this single (later arriving)
>>>>>> record
>>>>>>>>>> will get created, the computation will be triggered, you get a
>>>> result,
>>>>>>>>>> and afterwards the window is deleted again (as it's retention time
>>>>>>>>>> passed already).
>>>>>>>>>>
>>>>>>>>>> The retention time is driven by "stream-time", in internal tracked
>>>>>> time
>>>>>>>>>> that only progressed in forward direction. It gets it value from
>> the
>>>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default it
>>>> will
>>>>>>>>>> be event-time.
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>>>>>>>> I've read this and still have more questions than answers. If my
>>>> data
>>>>>>>>>> skips
>>>>>>>>>>> about (timewise) what determines when a given window will start /
>>>>>> stop
>>>>>>>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>>>>>>>> matthias@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Please have a look here:
>>>>>>>>>>>>
>>>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>>>>>> guide.html#windowing-a-stream
>>>>>>>>>>>>
>>>>>>>>>>>> If you have further question, just follow up :)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and
>> it's
>>>>>>>>>> working
>>>>>>>>>>>>> wonders for keeping the size of the state store in useful
>>>>>>>> boundaries...
>>>>>>>>>>>> But
>>>>>>>>>>>>> Im not 100% clear on how it works.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What is implied by the '.until()' clause? What determines when
>> to
>>>>>>>> stop
>>>>>>>>>>>>> receiving further data - is it clock time (since the window was
>>>>>>>>>> created)?
>>>>>>>>>>>>> It seems problematic for it to refer to EventTime as this may
>>>>>> bounce
>>>>>>>>>> all
>>>>>>>>>>>>> over the place. For non-overlapping windows a given record can
>>>> only
>>>>>>>>>> fall
>>>>>>>>>>>>> into a single aggregation period - so when would a value get
>>>>>>>> discarded?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>>>>>>>> 1000L).until(10 *
>>>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: How does 'TimeWindows.of().until()' work?

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
I suggest to include topic config as well as part of streams config
properties like we do for producer and consumer configs.
The topic config supplied would be used for creating internal changelog
topics along with certain additional configs which are applied by default.

This way we don't have to ever create internal topics manually.

I had one doubt regarding until.
Say I specify one value and run my streams app.
Now I stop the app, specify different value and re start the app.

Which value for retain would the old (pre existing) windows use. Would it
be the older value or the new value?

Thanks
Sachin



On Thu, Dec 15, 2016 at 12:26 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Understood. Makes sense.
>
> For this, you should apply Streams configs manually when creating those
> topics. For retention parameter, use the value you specify in
> corresponding .until() method for it.
>
>
> -Matthias
>
>
> On 12/14/16 10:08 AM, Sachin Mittal wrote:
> > I was referring to internal change log topic. I had to create them
> manually
> > because in some case the message size of these topic were greater than
> the
> > default ones used by kafka streams.
> >
> > I think someone in this group recommended to create these topic
> manually. I
> > understand that it is better to have internal topics created by streams
> app
> > and I will take a second look at these and see if that can be done.
> >
> > I just wanted to make sure what all configs are applied to internal
> topics
> > in order to decide to avoid them creating manually.
> >
> > Thanks
> > Sachin
> >
> >
> > On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> >> I am wondering about "I create internal topic manually" -- which topics
> >> do you refer in detail?
> >>
> >> Kafka Streams create all kind of internal topics with auto-generated
> >> names. So it would be quite tricky to create all of them manually
> >> (especially because you need to know those name in advance).
> >>
> >> IRRC, if a topic does exist, Kafka Streams does no change it's
> >> configuration. Only if Kafka Streams does create a topic, it will
> >> specify certain config parameters on topic create step.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> >>> Hi,
> >>> Thanks for the explanation. This illustration makes it super easy to
> >>> understand how until works. Perhaps we can update the wiki with this
> >>> illustration.
> >>> It is basically the retention time for a past window.
> >>> I used to think until creates all the future windows for that period
> and
> >>> when time passes that it used to delete all the past windows. However
> >>> actually until retains a window for specified time. This makes so much
> >> more
> >>> sense.
> >>>
> >>> I just had one pending query regarding:
> >>>
> >>>> windowstore.changelog.additional.retention.ms
> >>>
> >>> How does this relate to rentention.ms param of topic config?
> >>> I create internal topic manually using say rentention.ms=3600000.
> >>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> >>> internal changelog topic as well and I want it to be retained for say
> >> just
> >>> 1 hour.
> >>> So how does that above parameter interfere with this topic level
> setting.
> >>> Or now I just need to set above config as 3600000 and not add
> >>> rentention.ms=3600000
> >>> while creating internal topic.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> First, windows are only created if there is actual data for a window.
> So
> >>>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> >>>> falling into each window (btw: window start-time is inclusive while
> >>>> window end time is exclusive). If you have only 2 record with lets say
> >>>> ts=20 and ts=90 you will not have an open window [25,75). Each window
> is
> >>>> physically created each time the first record for it is processed.
> >>>>
> >>>> If you have above 4 windows and a record with ts=101 arrives, a new
> >>>> window [101,151) will be created. Window [0,50) will not be deleted
> yet,
> >>>> because retention is 100 and thus Streams guarantees that all record
> >>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
> >>>> records would fall into window [0,50).
> >>>>
> >>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> >>>> not before that.
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> >>>>> Hi,
> >>>>> So is until for future or past?
> >>>>> Say I get first record at t = 0 and until is 100 and my window size
> is
> >> 50
> >>>>> advance by 25.
> >>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
> >>>>> Now at t = 101 it will drop
> >>>>> (0, 50), (25, 75), (50, 100) and create
> >>>>> (101, 150), (125, 175), (150, 200)
> >>>>>
> >>>>> Please confirm if this understanding us correct. It is not clear how
> it
> >>>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
> >>>>>
> >>>>> What case is not clear again is that at say t = 102 I get some
> message
> >>>> with
> >>>>> timestamp 99. What happens then?
> >>>>> Will the result added to previous aggregation of (50, 100) or (75,
> >> 125),
> >>>>> like it should.
> >>>>>
> >>>>> Or it will recreate the old window (50, 100) and aggregate the value
> >>>> there
> >>>>> and then drop it. This would result is wrong aggregated value, as it
> >> does
> >>>>> not consider the previous aggregated values.
> >>>>>
> >>>>> So this is the pressing case I am not able to understand. Maybe I am
> >>>> wrong
> >>>>> at some basic understanding.
> >>>>>
> >>>>>
> >>>>> Next for
> >>>>> The parameter
> >>>>>> windowstore.changelog.additional.retention.ms
> >>>>>
> >>>>> How does this relate to rentention.ms param of topic config?
> >>>>> I create internal topic manually using say rentention.ms=3600000.
> >>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> >>>>> internal changelog topic as well and I want it to be retained for say
> >>>> just
> >>>>> 1 hour.
> >>>>> So how does that above parameter interfere with this topic level
> >> setting.
> >>>>> Or now I just need to set above config as 3600000 and not add
> >>>>> rentention.ms=3600000
> >>>>> while creating internal topic.
> >>>>> This is just another doubt remaining here.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Sachin,
> >>>>>>
> >>>>>> There is no reason to have an .until() AND a .retain() -- just
> >> increase
> >>>>>> the value of .until()
> >>>>>>
> >>>>>> If you have a window of let's say 1h size and you set .until() also
> to
> >>>>>> 1h -- you can obviously not process any late arriving data. If you
> set
> >>>>>> until() to 2h is this example, you can process data that is up to 1h
> >>>>>> delayed.
> >>>>>>
> >>>>>> So basically, the retention should always be larger than you window
> >>>> size.
> >>>>>>
> >>>>>> The parameter
> >>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>
> >>>>>> is applies to changelog topics that backup window state stores.
> Those
> >>>>>> changelog topics are compacted. However, the used key does encode an
> >>>>>> window ID and thus older data can never be cleaned up by compaction.
> >>>>>> Therefore, an additional retention time is applied to those topics,
> >> too.
> >>>>>> Thus, if an old window is not updated for this amount of time, it
> will
> >>>>>> get deleted eventually preventing this topic to grown infinitely.
> >>>>>>
> >>>>>> The value will be determined by until(), i.e., whatever you specify
> in
> >>>>>> .until() will be used to set this parameter.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>>>>>> Hi,
> >>>>>>> We are facing the exact problem as described by Matthias above.
> >>>>>>> We are keeping default until which is 1 day.
> >>>>>>>
> >>>>>>> Our record's times tamp extractor has a field which increases with
> >>>> time.
> >>>>>>> However for short time we cannot guarantee the time stamp is always
> >>>>>>> increases. So at the boundary ie after 24 hrs we can get records
> >> which
> >>>>>> are
> >>>>>>> beyond that windows retention period.
> >>>>>>>
> >>>>>>> Then it happens like it is mentioned above and our aggregation
> fails.
> >>>>>>>
> >>>>>>> So just to sum up when we get record
> >>>>>>> 24h + 1 sec (it deletes older window and since the new record
> belongs
> >>>> to
> >>>>>>> the new window its gets created)
> >>>>>>> Now when we get next record of 24 hs - 1 sec since older window is
> >>>>>> dropped
> >>>>>>> it does not get aggregated in that bucket.
> >>>>>>>
> >>>>>>> I suggest we have another setting next to until call retain which
> >>>> retains
> >>>>>>> the older windows into next window.
> >>>>>>>
> >>>>>>> I think at stream window boundary level it should use a concept of
> >>>>>> sliding
> >>>>>>> window. So we can define window like
> >>>>>>>
> >>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >>>>>> 1000l).untill(7
> >>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>>>>>
> >>>>>>> So after 7 days it retains the data covered by windows in last 15
> >>>> minutes
> >>>>>>> which rolls over the data in them to next window. This way streams
> >> work
> >>>>>>> continuously.
> >>>>>>>
> >>>>>>> Please let us know your thoughts on this.
> >>>>>>>
> >>>>>>> On another side question on this there is a setting:
> >>>>>>>
> >>>>>>> windowstore.changelog.additional.retention.ms
> >>>>>>> I is not clear what is does. Is this the default for until?
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Sachin
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> >>>> matthias@confluent.io
> >>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Windows are created on demand, ie, each time a new record arrives
> >> and
> >>>>>>>> there is no window yet for it, a new window will get created.
> >>>>>>>>
> >>>>>>>> Windows are accepting data until their retention time (that you
> can
> >>>>>>>> configure via .until()) passed. Thus, you will have many windows
> >> being
> >>>>>>>> open in parallel.
> >>>>>>>>
> >>>>>>>> If you read older data, they will just be put into the
> corresponding
> >>>>>>>> windows (as long as window retention time did not pass). If a
> window
> >>>> was
> >>>>>>>> discarded already, a new window with this single (later arriving)
> >>>> record
> >>>>>>>> will get created, the computation will be triggered, you get a
> >> result,
> >>>>>>>> and afterwards the window is deleted again (as it's retention time
> >>>>>>>> passed already).
> >>>>>>>>
> >>>>>>>> The retention time is driven by "stream-time", in internal tracked
> >>>> time
> >>>>>>>> that only progressed in forward direction. It gets it value from
> the
> >>>>>>>> timestamps provided by TimestampExtractor -- thus, per default it
> >> will
> >>>>>>>> be event-time.
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>>>>>> I've read this and still have more questions than answers. If my
> >> data
> >>>>>>>> skips
> >>>>>>>>> about (timewise) what determines when a given window will start /
> >>>> stop
> >>>>>>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>>>>>
> >>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >>>>>> matthias@confluent.io>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Please have a look here:
> >>>>>>>>>>
> >>>>>>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>>>>>> guide.html#windowing-a-stream
> >>>>>>>>>>
> >>>>>>>>>> If you have further question, just follow up :)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and
> it's
> >>>>>>>> working
> >>>>>>>>>>> wonders for keeping the size of the state store in useful
> >>>>>> boundaries...
> >>>>>>>>>> But
> >>>>>>>>>>> Im not 100% clear on how it works.
> >>>>>>>>>>>
> >>>>>>>>>>> What is implied by the '.until()' clause? What determines when
> to
> >>>>>> stop
> >>>>>>>>>>> receiving further data - is it clock time (since the window was
> >>>>>>>> created)?
> >>>>>>>>>>> It seems problematic for it to refer to EventTime as this may
> >>>> bounce
> >>>>>>>> all
> >>>>>>>>>>> over the place. For non-overlapping windows a given record can
> >> only
> >>>>>>>> fall
> >>>>>>>>>>> into a single aggregation period - so when would a value get
> >>>>>> discarded?
> >>>>>>>>>>>
> >>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>>>>>>>> 1000L).until(10 *
> >>>>>>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Understood. Makes sense.

For this, you should apply Streams configs manually when creating those
topics. For retention parameter, use the value you specify in
corresponding .until() method for it.


-Matthias


On 12/14/16 10:08 AM, Sachin Mittal wrote:
> I was referring to internal change log topic. I had to create them manually
> because in some case the message size of these topic were greater than the
> default ones used by kafka streams.
> 
> I think someone in this group recommended to create these topic manually. I
> understand that it is better to have internal topics created by streams app
> and I will take a second look at these and see if that can be done.
> 
> I just wanted to make sure what all configs are applied to internal topics
> in order to decide to avoid them creating manually.
> 
> Thanks
> Sachin
> 
> 
> On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> I am wondering about "I create internal topic manually" -- which topics
>> do you refer in detail?
>>
>> Kafka Streams create all kind of internal topics with auto-generated
>> names. So it would be quite tricky to create all of them manually
>> (especially because you need to know those name in advance).
>>
>> IRRC, if a topic does exist, Kafka Streams does no change it's
>> configuration. Only if Kafka Streams does create a topic, it will
>> specify certain config parameters on topic create step.
>>
>>
>> -Matthias
>>
>>
>>
>> On 12/13/16 8:16 PM, Sachin Mittal wrote:
>>> Hi,
>>> Thanks for the explanation. This illustration makes it super easy to
>>> understand how until works. Perhaps we can update the wiki with this
>>> illustration.
>>> It is basically the retention time for a past window.
>>> I used to think until creates all the future windows for that period and
>>> when time passes that it used to delete all the past windows. However
>>> actually until retains a window for specified time. This makes so much
>> more
>>> sense.
>>>
>>> I just had one pending query regarding:
>>>
>>>> windowstore.changelog.additional.retention.ms
>>>
>>> How does this relate to rentention.ms param of topic config?
>>> I create internal topic manually using say rentention.ms=3600000.
>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>> internal changelog topic as well and I want it to be retained for say
>> just
>>> 1 hour.
>>> So how does that above parameter interfere with this topic level setting.
>>> Or now I just need to set above config as 3600000 and not add
>>> rentention.ms=3600000
>>> while creating internal topic.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <matthias@confluent.io
>>>
>>> wrote:
>>>
>>>> First, windows are only created if there is actual data for a window. So
>>>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>>>> falling into each window (btw: window start-time is inclusive while
>>>> window end time is exclusive). If you have only 2 record with lets say
>>>> ts=20 and ts=90 you will not have an open window [25,75). Each window is
>>>> physically created each time the first record for it is processed.
>>>>
>>>> If you have above 4 windows and a record with ts=101 arrives, a new
>>>> window [101,151) will be created. Window [0,50) will not be deleted yet,
>>>> because retention is 100 and thus Streams guarantees that all record
>>>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>>>> records would fall into window [0,50).
>>>>
>>>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>>>> not before that.
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> So is until for future or past?
>>>>> Say I get first record at t = 0 and until is 100 and my window size is
>> 50
>>>>> advance by 25.
>>>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>>>> Now at t = 101 it will drop
>>>>> (0, 50), (25, 75), (50, 100) and create
>>>>> (101, 150), (125, 175), (150, 200)
>>>>>
>>>>> Please confirm if this understanding us correct. It is not clear how it
>>>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>>>
>>>>> What case is not clear again is that at say t = 102 I get some message
>>>> with
>>>>> timestamp 99. What happens then?
>>>>> Will the result added to previous aggregation of (50, 100) or (75,
>> 125),
>>>>> like it should.
>>>>>
>>>>> Or it will recreate the old window (50, 100) and aggregate the value
>>>> there
>>>>> and then drop it. This would result is wrong aggregated value, as it
>> does
>>>>> not consider the previous aggregated values.
>>>>>
>>>>> So this is the pressing case I am not able to understand. Maybe I am
>>>> wrong
>>>>> at some basic understanding.
>>>>>
>>>>>
>>>>> Next for
>>>>> The parameter
>>>>>> windowstore.changelog.additional.retention.ms
>>>>>
>>>>> How does this relate to rentention.ms param of topic config?
>>>>> I create internal topic manually using say rentention.ms=3600000.
>>>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>>>> internal changelog topic as well and I want it to be retained for say
>>>> just
>>>>> 1 hour.
>>>>> So how does that above parameter interfere with this topic level
>> setting.
>>>>> Or now I just need to set above config as 3600000 and not add
>>>>> rentention.ms=3600000
>>>>> while creating internal topic.
>>>>> This is just another doubt remaining here.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
>> matthias@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Sachin,
>>>>>>
>>>>>> There is no reason to have an .until() AND a .retain() -- just
>> increase
>>>>>> the value of .until()
>>>>>>
>>>>>> If you have a window of let's say 1h size and you set .until() also to
>>>>>> 1h -- you can obviously not process any late arriving data. If you set
>>>>>> until() to 2h is this example, you can process data that is up to 1h
>>>>>> delayed.
>>>>>>
>>>>>> So basically, the retention should always be larger than you window
>>>> size.
>>>>>>
>>>>>> The parameter
>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>
>>>>>> is applies to changelog topics that backup window state stores. Those
>>>>>> changelog topics are compacted. However, the used key does encode an
>>>>>> window ID and thus older data can never be cleaned up by compaction.
>>>>>> Therefore, an additional retention time is applied to those topics,
>> too.
>>>>>> Thus, if an old window is not updated for this amount of time, it will
>>>>>> get deleted eventually preventing this topic to grown infinitely.
>>>>>>
>>>>>> The value will be determined by until(), i.e., whatever you specify in
>>>>>> .until() will be used to set this parameter.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>>>>>> Hi,
>>>>>>> We are facing the exact problem as described by Matthias above.
>>>>>>> We are keeping default until which is 1 day.
>>>>>>>
>>>>>>> Our record's times tamp extractor has a field which increases with
>>>> time.
>>>>>>> However for short time we cannot guarantee the time stamp is always
>>>>>>> increases. So at the boundary ie after 24 hrs we can get records
>> which
>>>>>> are
>>>>>>> beyond that windows retention period.
>>>>>>>
>>>>>>> Then it happens like it is mentioned above and our aggregation fails.
>>>>>>>
>>>>>>> So just to sum up when we get record
>>>>>>> 24h + 1 sec (it deletes older window and since the new record belongs
>>>> to
>>>>>>> the new window its gets created)
>>>>>>> Now when we get next record of 24 hs - 1 sec since older window is
>>>>>> dropped
>>>>>>> it does not get aggregated in that bucket.
>>>>>>>
>>>>>>> I suggest we have another setting next to until call retain which
>>>> retains
>>>>>>> the older windows into next window.
>>>>>>>
>>>>>>> I think at stream window boundary level it should use a concept of
>>>>>> sliding
>>>>>>> window. So we can define window like
>>>>>>>
>>>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>>>>> 1000l).untill(7
>>>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>>>>>
>>>>>>> So after 7 days it retains the data covered by windows in last 15
>>>> minutes
>>>>>>> which rolls over the data in them to next window. This way streams
>> work
>>>>>>> continuously.
>>>>>>>
>>>>>>> Please let us know your thoughts on this.
>>>>>>>
>>>>>>> On another side question on this there is a setting:
>>>>>>>
>>>>>>> windowstore.changelog.additional.retention.ms
>>>>>>> I is not clear what is does. Is this the default for until?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
>>>> matthias@confluent.io
>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Windows are created on demand, ie, each time a new record arrives
>> and
>>>>>>>> there is no window yet for it, a new window will get created.
>>>>>>>>
>>>>>>>> Windows are accepting data until their retention time (that you can
>>>>>>>> configure via .until()) passed. Thus, you will have many windows
>> being
>>>>>>>> open in parallel.
>>>>>>>>
>>>>>>>> If you read older data, they will just be put into the corresponding
>>>>>>>> windows (as long as window retention time did not pass). If a window
>>>> was
>>>>>>>> discarded already, a new window with this single (later arriving)
>>>> record
>>>>>>>> will get created, the computation will be triggered, you get a
>> result,
>>>>>>>> and afterwards the window is deleted again (as it's retention time
>>>>>>>> passed already).
>>>>>>>>
>>>>>>>> The retention time is driven by "stream-time", in internal tracked
>>>> time
>>>>>>>> that only progressed in forward direction. It gets it value from the
>>>>>>>> timestamps provided by TimestampExtractor -- thus, per default it
>> will
>>>>>>>> be event-time.
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>>>>>> I've read this and still have more questions than answers. If my
>> data
>>>>>>>> skips
>>>>>>>>> about (timewise) what determines when a given window will start /
>>>> stop
>>>>>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>>>>>
>>>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>>>>>> matthias@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Please have a look here:
>>>>>>>>>>
>>>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>>>> guide.html#windowing-a-stream
>>>>>>>>>>
>>>>>>>>>> If you have further question, just follow up :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
>>>>>>>> working
>>>>>>>>>>> wonders for keeping the size of the state store in useful
>>>>>> boundaries...
>>>>>>>>>> But
>>>>>>>>>>> Im not 100% clear on how it works.
>>>>>>>>>>>
>>>>>>>>>>> What is implied by the '.until()' clause? What determines when to
>>>>>> stop
>>>>>>>>>>> receiving further data - is it clock time (since the window was
>>>>>>>> created)?
>>>>>>>>>>> It seems problematic for it to refer to EventTime as this may
>>>> bounce
>>>>>>>> all
>>>>>>>>>>> over the place. For non-overlapping windows a given record can
>> only
>>>>>>>> fall
>>>>>>>>>>> into a single aggregation period - so when would a value get
>>>>>> discarded?
>>>>>>>>>>>
>>>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>>>>>> 1000L).until(10 *
>>>>>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: How does 'TimeWindows.of().until()' work?

Posted by Sachin Mittal <sj...@gmail.com>.
I was referring to internal change log topic. I had to create them manually
because in some case the message size of these topic were greater than the
default ones used by kafka streams.

I think someone in this group recommended to create these topic manually. I
understand that it is better to have internal topics created by streams app
and I will take a second look at these and see if that can be done.

I just wanted to make sure what all configs are applied to internal topics
in order to decide to avoid them creating manually.

Thanks
Sachin


On Wed, Dec 14, 2016 at 11:08 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> I am wondering about "I create internal topic manually" -- which topics
> do you refer in detail?
>
> Kafka Streams create all kind of internal topics with auto-generated
> names. So it would be quite tricky to create all of them manually
> (especially because you need to know those name in advance).
>
> IRRC, if a topic does exist, Kafka Streams does no change it's
> configuration. Only if Kafka Streams does create a topic, it will
> specify certain config parameters on topic create step.
>
>
> -Matthias
>
>
>
> On 12/13/16 8:16 PM, Sachin Mittal wrote:
> > Hi,
> > Thanks for the explanation. This illustration makes it super easy to
> > understand how until works. Perhaps we can update the wiki with this
> > illustration.
> > It is basically the retention time for a past window.
> > I used to think until creates all the future windows for that period and
> > when time passes that it used to delete all the past windows. However
> > actually until retains a window for specified time. This makes so much
> more
> > sense.
> >
> > I just had one pending query regarding:
> >
> >> windowstore.changelog.additional.retention.ms
> >
> > How does this relate to rentention.ms param of topic config?
> > I create internal topic manually using say rentention.ms=3600000.
> > In next release (post kafka_2.10-0.10.0.1) since we support delete of
> > internal changelog topic as well and I want it to be retained for say
> just
> > 1 hour.
> > So how does that above parameter interfere with this topic level setting.
> > Or now I just need to set above config as 3600000 and not add
> > rentention.ms=3600000
> > while creating internal topic.
> >
> > Thanks
> > Sachin
> >
> >
> > On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> >> First, windows are only created if there is actual data for a window. So
> >> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> >> falling into each window (btw: window start-time is inclusive while
> >> window end time is exclusive). If you have only 2 record with lets say
> >> ts=20 and ts=90 you will not have an open window [25,75). Each window is
> >> physically created each time the first record for it is processed.
> >>
> >> If you have above 4 windows and a record with ts=101 arrives, a new
> >> window [101,151) will be created. Window [0,50) will not be deleted yet,
> >> because retention is 100 and thus Streams guarantees that all record
> >> with ts >= 1 (= 101 - 100) are still processed correctly and those
> >> records would fall into window [0,50).
> >>
> >> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> >> not before that.
> >>
> >> -Matthias
> >>
> >>
> >> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> >>> Hi,
> >>> So is until for future or past?
> >>> Say I get first record at t = 0 and until is 100 and my window size is
> 50
> >>> advance by 25.
> >>> I understand it will create windows (0, 50), (25, 75), (50, 100)
> >>> Now at t = 101 it will drop
> >>> (0, 50), (25, 75), (50, 100) and create
> >>> (101, 150), (125, 175), (150, 200)
> >>>
> >>> Please confirm if this understanding us correct. It is not clear how it
> >>> will handle overlapping windows (75, 125) and (175, 225) and so on?
> >>>
> >>> What case is not clear again is that at say t = 102 I get some message
> >> with
> >>> timestamp 99. What happens then?
> >>> Will the result added to previous aggregation of (50, 100) or (75,
> 125),
> >>> like it should.
> >>>
> >>> Or it will recreate the old window (50, 100) and aggregate the value
> >> there
> >>> and then drop it. This would result is wrong aggregated value, as it
> does
> >>> not consider the previous aggregated values.
> >>>
> >>> So this is the pressing case I am not able to understand. Maybe I am
> >> wrong
> >>> at some basic understanding.
> >>>
> >>>
> >>> Next for
> >>> The parameter
> >>>> windowstore.changelog.additional.retention.ms
> >>>
> >>> How does this relate to rentention.ms param of topic config?
> >>> I create internal topic manually using say rentention.ms=3600000.
> >>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> >>> internal changelog topic as well and I want it to be retained for say
> >> just
> >>> 1 hour.
> >>> So how does that above parameter interfere with this topic level
> setting.
> >>> Or now I just need to set above config as 3600000 and not add
> >>> rentention.ms=3600000
> >>> while creating internal topic.
> >>> This is just another doubt remaining here.
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>>
> >>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <
> matthias@confluent.io>
> >>> wrote:
> >>>
> >>>> Sachin,
> >>>>
> >>>> There is no reason to have an .until() AND a .retain() -- just
> increase
> >>>> the value of .until()
> >>>>
> >>>> If you have a window of let's say 1h size and you set .until() also to
> >>>> 1h -- you can obviously not process any late arriving data. If you set
> >>>> until() to 2h is this example, you can process data that is up to 1h
> >>>> delayed.
> >>>>
> >>>> So basically, the retention should always be larger than you window
> >> size.
> >>>>
> >>>> The parameter
> >>>>> windowstore.changelog.additional.retention.ms
> >>>>
> >>>> is applies to changelog topics that backup window state stores. Those
> >>>> changelog topics are compacted. However, the used key does encode an
> >>>> window ID and thus older data can never be cleaned up by compaction.
> >>>> Therefore, an additional retention time is applied to those topics,
> too.
> >>>> Thus, if an old window is not updated for this amount of time, it will
> >>>> get deleted eventually preventing this topic to grown infinitely.
> >>>>
> >>>> The value will be determined by until(), i.e., whatever you specify in
> >>>> .until() will be used to set this parameter.
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>>>> Hi,
> >>>>> We are facing the exact problem as described by Matthias above.
> >>>>> We are keeping default until which is 1 day.
> >>>>>
> >>>>> Our record's times tamp extractor has a field which increases with
> >> time.
> >>>>> However for short time we cannot guarantee the time stamp is always
> >>>>> increases. So at the boundary ie after 24 hrs we can get records
> which
> >>>> are
> >>>>> beyond that windows retention period.
> >>>>>
> >>>>> Then it happens like it is mentioned above and our aggregation fails.
> >>>>>
> >>>>> So just to sum up when we get record
> >>>>> 24h + 1 sec (it deletes older window and since the new record belongs
> >> to
> >>>>> the new window its gets created)
> >>>>> Now when we get next record of 24 hs - 1 sec since older window is
> >>>> dropped
> >>>>> it does not get aggregated in that bucket.
> >>>>>
> >>>>> I suggest we have another setting next to until call retain which
> >> retains
> >>>>> the older windows into next window.
> >>>>>
> >>>>> I think at stream window boundary level it should use a concept of
> >>>> sliding
> >>>>> window. So we can define window like
> >>>>>
> >>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >>>> 1000l).untill(7
> >>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>>>
> >>>>> So after 7 days it retains the data covered by windows in last 15
> >> minutes
> >>>>> which rolls over the data in them to next window. This way streams
> work
> >>>>> continuously.
> >>>>>
> >>>>> Please let us know your thoughts on this.
> >>>>>
> >>>>> On another side question on this there is a setting:
> >>>>>
> >>>>> windowstore.changelog.additional.retention.ms
> >>>>> I is not clear what is does. Is this the default for until?
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>>
> >>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> >> matthias@confluent.io
> >>>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Windows are created on demand, ie, each time a new record arrives
> and
> >>>>>> there is no window yet for it, a new window will get created.
> >>>>>>
> >>>>>> Windows are accepting data until their retention time (that you can
> >>>>>> configure via .until()) passed. Thus, you will have many windows
> being
> >>>>>> open in parallel.
> >>>>>>
> >>>>>> If you read older data, they will just be put into the corresponding
> >>>>>> windows (as long as window retention time did not pass). If a window
> >> was
> >>>>>> discarded already, a new window with this single (later arriving)
> >> record
> >>>>>> will get created, the computation will be triggered, you get a
> result,
> >>>>>> and afterwards the window is deleted again (as it's retention time
> >>>>>> passed already).
> >>>>>>
> >>>>>> The retention time is driven by "stream-time", in internal tracked
> >> time
> >>>>>> that only progressed in forward direction. It gets it value from the
> >>>>>> timestamps provided by TimestampExtractor -- thus, per default it
> will
> >>>>>> be event-time.
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>>>> I've read this and still have more questions than answers. If my
> data
> >>>>>> skips
> >>>>>>> about (timewise) what determines when a given window will start /
> >> stop
> >>>>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>>>
> >>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >>>> matthias@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Please have a look here:
> >>>>>>>>
> >>>>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>>>> guide.html#windowing-a-stream
> >>>>>>>>
> >>>>>>>> If you have further question, just follow up :)
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
> >>>>>> working
> >>>>>>>>> wonders for keeping the size of the state store in useful
> >>>> boundaries...
> >>>>>>>> But
> >>>>>>>>> Im not 100% clear on how it works.
> >>>>>>>>>
> >>>>>>>>> What is implied by the '.until()' clause? What determines when to
> >>>> stop
> >>>>>>>>> receiving further data - is it clock time (since the window was
> >>>>>> created)?
> >>>>>>>>> It seems problematic for it to refer to EventTime as this may
> >> bounce
> >>>>>> all
> >>>>>>>>> over the place. For non-overlapping windows a given record can
> only
> >>>>>> fall
> >>>>>>>>> into a single aggregation period - so when would a value get
> >>>> discarded?
> >>>>>>>>>
> >>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>>>>>> 1000L).until(10 *
> >>>>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I am wondering about "I create internal topic manually" -- which topics
do you refer in detail?

Kafka Streams create all kind of internal topics with auto-generated
names. So it would be quite tricky to create all of them manually
(especially because you need to know those name in advance).

IRRC, if a topic does exist, Kafka Streams does no change it's
configuration. Only if Kafka Streams does create a topic, it will
specify certain config parameters on topic create step.


-Matthias



On 12/13/16 8:16 PM, Sachin Mittal wrote:
> Hi,
> Thanks for the explanation. This illustration makes it super easy to
> understand how until works. Perhaps we can update the wiki with this
> illustration.
> It is basically the retention time for a past window.
> I used to think until creates all the future windows for that period and
> when time passes that it used to delete all the past windows. However
> actually until retains a window for specified time. This makes so much more
> sense.
> 
> I just had one pending query regarding:
> 
>> windowstore.changelog.additional.retention.ms
> 
> How does this relate to rentention.ms param of topic config?
> I create internal topic manually using say rentention.ms=3600000.
> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> internal changelog topic as well and I want it to be retained for say just
> 1 hour.
> So how does that above parameter interfere with this topic level setting.
> Or now I just need to set above config as 3600000 and not add
> rentention.ms=3600000
> while creating internal topic.
> 
> Thanks
> Sachin
> 
> 
> On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> First, windows are only created if there is actual data for a window. So
>> you get windows [0, 50), [25, 75), [50, 100) only if there are record
>> falling into each window (btw: window start-time is inclusive while
>> window end time is exclusive). If you have only 2 record with lets say
>> ts=20 and ts=90 you will not have an open window [25,75). Each window is
>> physically created each time the first record for it is processed.
>>
>> If you have above 4 windows and a record with ts=101 arrives, a new
>> window [101,151) will be created. Window [0,50) will not be deleted yet,
>> because retention is 100 and thus Streams guarantees that all record
>> with ts >= 1 (= 101 - 100) are still processed correctly and those
>> records would fall into window [0,50).
>>
>> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
>> not before that.
>>
>> -Matthias
>>
>>
>> On 12/13/16 12:06 AM, Sachin Mittal wrote:
>>> Hi,
>>> So is until for future or past?
>>> Say I get first record at t = 0 and until is 100 and my window size is 50
>>> advance by 25.
>>> I understand it will create windows (0, 50), (25, 75), (50, 100)
>>> Now at t = 101 it will drop
>>> (0, 50), (25, 75), (50, 100) and create
>>> (101, 150), (125, 175), (150, 200)
>>>
>>> Please confirm if this understanding us correct. It is not clear how it
>>> will handle overlapping windows (75, 125) and (175, 225) and so on?
>>>
>>> What case is not clear again is that at say t = 102 I get some message
>> with
>>> timestamp 99. What happens then?
>>> Will the result added to previous aggregation of (50, 100) or (75, 125),
>>> like it should.
>>>
>>> Or it will recreate the old window (50, 100) and aggregate the value
>> there
>>> and then drop it. This would result is wrong aggregated value, as it does
>>> not consider the previous aggregated values.
>>>
>>> So this is the pressing case I am not able to understand. Maybe I am
>> wrong
>>> at some basic understanding.
>>>
>>>
>>> Next for
>>> The parameter
>>>> windowstore.changelog.additional.retention.ms
>>>
>>> How does this relate to rentention.ms param of topic config?
>>> I create internal topic manually using say rentention.ms=3600000.
>>> In next release (post kafka_2.10-0.10.0.1) since we support delete of
>>> internal changelog topic as well and I want it to be retained for say
>> just
>>> 1 hour.
>>> So how does that above parameter interfere with this topic level setting.
>>> Or now I just need to set above config as 3600000 and not add
>>> rentention.ms=3600000
>>> while creating internal topic.
>>> This is just another doubt remaining here.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Sachin,
>>>>
>>>> There is no reason to have an .until() AND a .retain() -- just increase
>>>> the value of .until()
>>>>
>>>> If you have a window of let's say 1h size and you set .until() also to
>>>> 1h -- you can obviously not process any late arriving data. If you set
>>>> until() to 2h is this example, you can process data that is up to 1h
>>>> delayed.
>>>>
>>>> So basically, the retention should always be larger than you window
>> size.
>>>>
>>>> The parameter
>>>>> windowstore.changelog.additional.retention.ms
>>>>
>>>> is applies to changelog topics that backup window state stores. Those
>>>> changelog topics are compacted. However, the used key does encode an
>>>> window ID and thus older data can never be cleaned up by compaction.
>>>> Therefore, an additional retention time is applied to those topics, too.
>>>> Thus, if an old window is not updated for this amount of time, it will
>>>> get deleted eventually preventing this topic to grown infinitely.
>>>>
>>>> The value will be determined by until(), i.e., whatever you specify in
>>>> .until() will be used to set this parameter.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>>>> Hi,
>>>>> We are facing the exact problem as described by Matthias above.
>>>>> We are keeping default until which is 1 day.
>>>>>
>>>>> Our record's times tamp extractor has a field which increases with
>> time.
>>>>> However for short time we cannot guarantee the time stamp is always
>>>>> increases. So at the boundary ie after 24 hrs we can get records which
>>>> are
>>>>> beyond that windows retention period.
>>>>>
>>>>> Then it happens like it is mentioned above and our aggregation fails.
>>>>>
>>>>> So just to sum up when we get record
>>>>> 24h + 1 sec (it deletes older window and since the new record belongs
>> to
>>>>> the new window its gets created)
>>>>> Now when we get next record of 24 hs - 1 sec since older window is
>>>> dropped
>>>>> it does not get aggregated in that bucket.
>>>>>
>>>>> I suggest we have another setting next to until call retain which
>> retains
>>>>> the older windows into next window.
>>>>>
>>>>> I think at stream window boundary level it should use a concept of
>>>> sliding
>>>>> window. So we can define window like
>>>>>
>>>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>>>> 1000l).untill(7
>>>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>>>
>>>>> So after 7 days it retains the data covered by windows in last 15
>> minutes
>>>>> which rolls over the data in them to next window. This way streams work
>>>>> continuously.
>>>>>
>>>>> Please let us know your thoughts on this.
>>>>>
>>>>> On another side question on this there is a setting:
>>>>>
>>>>> windowstore.changelog.additional.retention.ms
>>>>> I is not clear what is does. Is this the default for until?
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
>> matthias@confluent.io
>>>>>
>>>>> wrote:
>>>>>
>>>>>> Windows are created on demand, ie, each time a new record arrives and
>>>>>> there is no window yet for it, a new window will get created.
>>>>>>
>>>>>> Windows are accepting data until their retention time (that you can
>>>>>> configure via .until()) passed. Thus, you will have many windows being
>>>>>> open in parallel.
>>>>>>
>>>>>> If you read older data, they will just be put into the corresponding
>>>>>> windows (as long as window retention time did not pass). If a window
>> was
>>>>>> discarded already, a new window with this single (later arriving)
>> record
>>>>>> will get created, the computation will be triggered, you get a result,
>>>>>> and afterwards the window is deleted again (as it's retention time
>>>>>> passed already).
>>>>>>
>>>>>> The retention time is driven by "stream-time", in internal tracked
>> time
>>>>>> that only progressed in forward direction. It gets it value from the
>>>>>> timestamps provided by TimestampExtractor -- thus, per default it will
>>>>>> be event-time.
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>>>> I've read this and still have more questions than answers. If my data
>>>>>> skips
>>>>>>> about (timewise) what determines when a given window will start /
>> stop
>>>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>>>
>>>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>>>> matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Please have a look here:
>>>>>>>>
>>>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>>>> guide.html#windowing-a-stream
>>>>>>>>
>>>>>>>> If you have further question, just follow up :)
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
>>>>>> working
>>>>>>>>> wonders for keeping the size of the state store in useful
>>>> boundaries...
>>>>>>>> But
>>>>>>>>> Im not 100% clear on how it works.
>>>>>>>>>
>>>>>>>>> What is implied by the '.until()' clause? What determines when to
>>>> stop
>>>>>>>>> receiving further data - is it clock time (since the window was
>>>>>> created)?
>>>>>>>>> It seems problematic for it to refer to EventTime as this may
>> bounce
>>>>>> all
>>>>>>>>> over the place. For non-overlapping windows a given record can only
>>>>>> fall
>>>>>>>>> into a single aggregation period - so when would a value get
>>>> discarded?
>>>>>>>>>
>>>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>>>> 1000L).until(10 *
>>>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: How does 'TimeWindows.of().until()' work?

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
Thanks for the explanation. This illustration makes it super easy to
understand how until works. Perhaps we can update the wiki with this
illustration.
It is basically the retention time for a past window.
I used to think until creates all the future windows for that period and
when time passes that it used to delete all the past windows. However
actually until retains a window for specified time. This makes so much more
sense.

I just had one pending query regarding:

> windowstore.changelog.additional.retention.ms

How does this relate to rentention.ms param of topic config?
I create internal topic manually using say rentention.ms=3600000.
In next release (post kafka_2.10-0.10.0.1) since we support delete of
internal changelog topic as well and I want it to be retained for say just
1 hour.
So how does that above parameter interfere with this topic level setting.
Or now I just need to set above config as 3600000 and not add
rentention.ms=3600000
while creating internal topic.

Thanks
Sachin


On Tue, Dec 13, 2016 at 11:27 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> First, windows are only created if there is actual data for a window. So
> you get windows [0, 50), [25, 75), [50, 100) only if there are record
> falling into each window (btw: window start-time is inclusive while
> window end time is exclusive). If you have only 2 record with lets say
> ts=20 and ts=90 you will not have an open window [25,75). Each window is
> physically created each time the first record for it is processed.
>
> If you have above 4 windows and a record with ts=101 arrives, a new
> window [101,151) will be created. Window [0,50) will not be deleted yet,
> because retention is 100 and thus Streams guarantees that all record
> with ts >= 1 (= 101 - 100) are still processed correctly and those
> records would fall into window [0,50).
>
> Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
> not before that.
>
> -Matthias
>
>
> On 12/13/16 12:06 AM, Sachin Mittal wrote:
> > Hi,
> > So is until for future or past?
> > Say I get first record at t = 0 and until is 100 and my window size is 50
> > advance by 25.
> > I understand it will create windows (0, 50), (25, 75), (50, 100)
> > Now at t = 101 it will drop
> > (0, 50), (25, 75), (50, 100) and create
> > (101, 150), (125, 175), (150, 200)
> >
> > Please confirm if this understanding us correct. It is not clear how it
> > will handle overlapping windows (75, 125) and (175, 225) and so on?
> >
> > What case is not clear again is that at say t = 102 I get some message
> with
> > timestamp 99. What happens then?
> > Will the result added to previous aggregation of (50, 100) or (75, 125),
> > like it should.
> >
> > Or it will recreate the old window (50, 100) and aggregate the value
> there
> > and then drop it. This would result is wrong aggregated value, as it does
> > not consider the previous aggregated values.
> >
> > So this is the pressing case I am not able to understand. Maybe I am
> wrong
> > at some basic understanding.
> >
> >
> > Next for
> > The parameter
> >> windowstore.changelog.additional.retention.ms
> >
> > How does this relate to rentention.ms param of topic config?
> > I create internal topic manually using say rentention.ms=3600000.
> > In next release (post kafka_2.10-0.10.0.1) since we support delete of
> > internal changelog topic as well and I want it to be retained for say
> just
> > 1 hour.
> > So how does that above parameter interfere with this topic level setting.
> > Or now I just need to set above config as 3600000 and not add
> > rentention.ms=3600000
> > while creating internal topic.
> > This is just another doubt remaining here.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Sachin,
> >>
> >> There is no reason to have an .until() AND a .retain() -- just increase
> >> the value of .until()
> >>
> >> If you have a window of let's say 1h size and you set .until() also to
> >> 1h -- you can obviously not process any late arriving data. If you set
> >> until() to 2h is this example, you can process data that is up to 1h
> >> delayed.
> >>
> >> So basically, the retention should always be larger than you window
> size.
> >>
> >> The parameter
> >>> windowstore.changelog.additional.retention.ms
> >>
> >> is applies to changelog topics that backup window state stores. Those
> >> changelog topics are compacted. However, the used key does encode an
> >> window ID and thus older data can never be cleaned up by compaction.
> >> Therefore, an additional retention time is applied to those topics, too.
> >> Thus, if an old window is not updated for this amount of time, it will
> >> get deleted eventually preventing this topic to grown infinitely.
> >>
> >> The value will be determined by until(), i.e., whatever you specify in
> >> .until() will be used to set this parameter.
> >>
> >>
> >> -Matthias
> >>
> >> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> >>> Hi,
> >>> We are facing the exact problem as described by Matthias above.
> >>> We are keeping default until which is 1 day.
> >>>
> >>> Our record's times tamp extractor has a field which increases with
> time.
> >>> However for short time we cannot guarantee the time stamp is always
> >>> increases. So at the boundary ie after 24 hrs we can get records which
> >> are
> >>> beyond that windows retention period.
> >>>
> >>> Then it happens like it is mentioned above and our aggregation fails.
> >>>
> >>> So just to sum up when we get record
> >>> 24h + 1 sec (it deletes older window and since the new record belongs
> to
> >>> the new window its gets created)
> >>> Now when we get next record of 24 hs - 1 sec since older window is
> >> dropped
> >>> it does not get aggregated in that bucket.
> >>>
> >>> I suggest we have another setting next to until call retain which
> retains
> >>> the older windows into next window.
> >>>
> >>> I think at stream window boundary level it should use a concept of
> >> sliding
> >>> window. So we can define window like
> >>>
> >>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> >> 1000l).untill(7
> >>> * 24 * 3600 * 1000l).retain(900 * 1000l)
> >>>
> >>> So after 7 days it retains the data covered by windows in last 15
> minutes
> >>> which rolls over the data in them to next window. This way streams work
> >>> continuously.
> >>>
> >>> Please let us know your thoughts on this.
> >>>
> >>> On another side question on this there is a setting:
> >>>
> >>> windowstore.changelog.additional.retention.ms
> >>> I is not clear what is does. Is this the default for until?
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <
> matthias@confluent.io
> >>>
> >>> wrote:
> >>>
> >>>> Windows are created on demand, ie, each time a new record arrives and
> >>>> there is no window yet for it, a new window will get created.
> >>>>
> >>>> Windows are accepting data until their retention time (that you can
> >>>> configure via .until()) passed. Thus, you will have many windows being
> >>>> open in parallel.
> >>>>
> >>>> If you read older data, they will just be put into the corresponding
> >>>> windows (as long as window retention time did not pass). If a window
> was
> >>>> discarded already, a new window with this single (later arriving)
> record
> >>>> will get created, the computation will be triggered, you get a result,
> >>>> and afterwards the window is deleted again (as it's retention time
> >>>> passed already).
> >>>>
> >>>> The retention time is driven by "stream-time", in internal tracked
> time
> >>>> that only progressed in forward direction. It gets it value from the
> >>>> timestamps provided by TimestampExtractor -- thus, per default it will
> >>>> be event-time.
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>>>> I've read this and still have more questions than answers. If my data
> >>>> skips
> >>>>> about (timewise) what determines when a given window will start /
> stop
> >>>>> accepting new data? What if Im reading data from some time ago?
> >>>>>
> >>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Please have a look here:
> >>>>>>
> >>>>>> http://docs.confluent.io/current/streams/developer-
> >>>>>> guide.html#windowing-a-stream
> >>>>>>
> >>>>>> If you have further question, just follow up :)
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
> >>>> working
> >>>>>>> wonders for keeping the size of the state store in useful
> >> boundaries...
> >>>>>> But
> >>>>>>> Im not 100% clear on how it works.
> >>>>>>>
> >>>>>>> What is implied by the '.until()' clause? What determines when to
> >> stop
> >>>>>>> receiving further data - is it clock time (since the window was
> >>>> created)?
> >>>>>>> It seems problematic for it to refer to EventTime as this may
> bounce
> >>>> all
> >>>>>>> over the place. For non-overlapping windows a given record can only
> >>>> fall
> >>>>>>> into a single aggregation period - so when would a value get
> >> discarded?
> >>>>>>>
> >>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>>>> 1000L).until(10 *
> >>>>>>> 1000L))'  - but what is this accomplishing?
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
First, windows are only created if there is actual data for a window. So
you get windows [0, 50), [25, 75), [50, 100) only if there are record
falling into each window (btw: window start-time is inclusive while
window end time is exclusive). If you have only 2 record with lets say
ts=20 and ts=90 you will not have an open window [25,75). Each window is
physically created each time the first record for it is processed.

If you have above 4 windows and a record with ts=101 arrives, a new
window [101,151) will be created. Window [0,50) will not be deleted yet,
because retention is 100 and thus Streams guarantees that all record
with ts >= 1 (= 101 - 100) are still processed correctly and those
records would fall into window [0,50).

Thus, window [0,50) can be dropped, if time advanced to TS = 150, but
not before that.

-Matthias


On 12/13/16 12:06 AM, Sachin Mittal wrote:
> Hi,
> So is until for future or past?
> Say I get first record at t = 0 and until is 100 and my window size is 50
> advance by 25.
> I understand it will create windows (0, 50), (25, 75), (50, 100)
> Now at t = 101 it will drop
> (0, 50), (25, 75), (50, 100) and create
> (101, 150), (125, 175), (150, 200)
> 
> Please confirm if this understanding us correct. It is not clear how it
> will handle overlapping windows (75, 125) and (175, 225) and so on?
> 
> What case is not clear again is that at say t = 102 I get some message with
> timestamp 99. What happens then?
> Will the result added to previous aggregation of (50, 100) or (75, 125),
> like it should.
> 
> Or it will recreate the old window (50, 100) and aggregate the value there
> and then drop it. This would result is wrong aggregated value, as it does
> not consider the previous aggregated values.
> 
> So this is the pressing case I am not able to understand. Maybe I am wrong
> at some basic understanding.
> 
> 
> Next for
> The parameter
>> windowstore.changelog.additional.retention.ms
> 
> How does this relate to rentention.ms param of topic config?
> I create internal topic manually using say rentention.ms=3600000.
> In next release (post kafka_2.10-0.10.0.1) since we support delete of
> internal changelog topic as well and I want it to be retained for say just
> 1 hour.
> So how does that above parameter interfere with this topic level setting.
> Or now I just need to set above config as 3600000 and not add
> rentention.ms=3600000
> while creating internal topic.
> This is just another doubt remaining here.
> 
> Thanks
> Sachin
> 
> 
> 
> On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Sachin,
>>
>> There is no reason to have an .until() AND a .retain() -- just increase
>> the value of .until()
>>
>> If you have a window of let's say 1h size and you set .until() also to
>> 1h -- you can obviously not process any late arriving data. If you set
>> until() to 2h is this example, you can process data that is up to 1h
>> delayed.
>>
>> So basically, the retention should always be larger than you window size.
>>
>> The parameter
>>> windowstore.changelog.additional.retention.ms
>>
>> is applies to changelog topics that backup window state stores. Those
>> changelog topics are compacted. However, the used key does encode an
>> window ID and thus older data can never be cleaned up by compaction.
>> Therefore, an additional retention time is applied to those topics, too.
>> Thus, if an old window is not updated for this amount of time, it will
>> get deleted eventually preventing this topic to grown infinitely.
>>
>> The value will be determined by until(), i.e., whatever you specify in
>> .until() will be used to set this parameter.
>>
>>
>> -Matthias
>>
>> On 12/12/16 1:07 AM, Sachin Mittal wrote:
>>> Hi,
>>> We are facing the exact problem as described by Matthias above.
>>> We are keeping default until which is 1 day.
>>>
>>> Our record's times tamp extractor has a field which increases with time.
>>> However for short time we cannot guarantee the time stamp is always
>>> increases. So at the boundary ie after 24 hrs we can get records which
>> are
>>> beyond that windows retention period.
>>>
>>> Then it happens like it is mentioned above and our aggregation fails.
>>>
>>> So just to sum up when we get record
>>> 24h + 1 sec (it deletes older window and since the new record belongs to
>>> the new window its gets created)
>>> Now when we get next record of 24 hs - 1 sec since older window is
>> dropped
>>> it does not get aggregated in that bucket.
>>>
>>> I suggest we have another setting next to until call retain which retains
>>> the older windows into next window.
>>>
>>> I think at stream window boundary level it should use a concept of
>> sliding
>>> window. So we can define window like
>>>
>>> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
>> 1000l).untill(7
>>> * 24 * 3600 * 1000l).retain(900 * 1000l)
>>>
>>> So after 7 days it retains the data covered by windows in last 15 minutes
>>> which rolls over the data in them to next window. This way streams work
>>> continuously.
>>>
>>> Please let us know your thoughts on this.
>>>
>>> On another side question on this there is a setting:
>>>
>>> windowstore.changelog.additional.retention.ms
>>> I is not clear what is does. Is this the default for until?
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <matthias@confluent.io
>>>
>>> wrote:
>>>
>>>> Windows are created on demand, ie, each time a new record arrives and
>>>> there is no window yet for it, a new window will get created.
>>>>
>>>> Windows are accepting data until their retention time (that you can
>>>> configure via .until()) passed. Thus, you will have many windows being
>>>> open in parallel.
>>>>
>>>> If you read older data, they will just be put into the corresponding
>>>> windows (as long as window retention time did not pass). If a window was
>>>> discarded already, a new window with this single (later arriving) record
>>>> will get created, the computation will be triggered, you get a result,
>>>> and afterwards the window is deleted again (as it's retention time
>>>> passed already).
>>>>
>>>> The retention time is driven by "stream-time", in internal tracked time
>>>> that only progressed in forward direction. It gets it value from the
>>>> timestamps provided by TimestampExtractor -- thus, per default it will
>>>> be event-time.
>>>>
>>>> -Matthias
>>>>
>>>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>>>> I've read this and still have more questions than answers. If my data
>>>> skips
>>>>> about (timewise) what determines when a given window will start / stop
>>>>> accepting new data? What if Im reading data from some time ago?
>>>>>
>>>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
>> matthias@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Please have a look here:
>>>>>>
>>>>>> http://docs.confluent.io/current/streams/developer-
>>>>>> guide.html#windowing-a-stream
>>>>>>
>>>>>> If you have further question, just follow up :)
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>>>> Ive added the 'until()' clause to some aggregation steps and it's
>>>> working
>>>>>>> wonders for keeping the size of the state store in useful
>> boundaries...
>>>>>> But
>>>>>>> Im not 100% clear on how it works.
>>>>>>>
>>>>>>> What is implied by the '.until()' clause? What determines when to
>> stop
>>>>>>> receiving further data - is it clock time (since the window was
>>>> created)?
>>>>>>> It seems problematic for it to refer to EventTime as this may bounce
>>>> all
>>>>>>> over the place. For non-overlapping windows a given record can only
>>>> fall
>>>>>>> into a single aggregation period - so when would a value get
>> discarded?
>>>>>>>
>>>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>>>> 1000L).until(10 *
>>>>>>> 1000L))'  - but what is this accomplishing?
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: How does 'TimeWindows.of().until()' work?

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
So is until for future or past?
Say I get first record at t = 0 and until is 100 and my window size is 50
advance by 25.
I understand it will create windows (0, 50), (25, 75), (50, 100)
Now at t = 101 it will drop
(0, 50), (25, 75), (50, 100) and create
(101, 150), (125, 175), (150, 200)

Please confirm if this understanding us correct. It is not clear how it
will handle overlapping windows (75, 125) and (175, 225) and so on?

What case is not clear again is that at say t = 102 I get some message with
timestamp 99. What happens then?
Will the result added to previous aggregation of (50, 100) or (75, 125),
like it should.

Or it will recreate the old window (50, 100) and aggregate the value there
and then drop it. This would result is wrong aggregated value, as it does
not consider the previous aggregated values.

So this is the pressing case I am not able to understand. Maybe I am wrong
at some basic understanding.


Next for
The parameter
> windowstore.changelog.additional.retention.ms

How does this relate to rentention.ms param of topic config?
I create internal topic manually using say rentention.ms=3600000.
In next release (post kafka_2.10-0.10.0.1) since we support delete of
internal changelog topic as well and I want it to be retained for say just
1 hour.
So how does that above parameter interfere with this topic level setting.
Or now I just need to set above config as 3600000 and not add
rentention.ms=3600000
while creating internal topic.
This is just another doubt remaining here.

Thanks
Sachin



On Tue, Dec 13, 2016 at 3:02 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Sachin,
>
> There is no reason to have an .until() AND a .retain() -- just increase
> the value of .until()
>
> If you have a window of let's say 1h size and you set .until() also to
> 1h -- you can obviously not process any late arriving data. If you set
> until() to 2h is this example, you can process data that is up to 1h
> delayed.
>
> So basically, the retention should always be larger than you window size.
>
> The parameter
> > windowstore.changelog.additional.retention.ms
>
> is applies to changelog topics that backup window state stores. Those
> changelog topics are compacted. However, the used key does encode an
> window ID and thus older data can never be cleaned up by compaction.
> Therefore, an additional retention time is applied to those topics, too.
> Thus, if an old window is not updated for this amount of time, it will
> get deleted eventually preventing this topic to grown infinitely.
>
> The value will be determined by until(), i.e., whatever you specify in
> .until() will be used to set this parameter.
>
>
> -Matthias
>
> On 12/12/16 1:07 AM, Sachin Mittal wrote:
> > Hi,
> > We are facing the exact problem as described by Matthias above.
> > We are keeping default until which is 1 day.
> >
> > Our record's times tamp extractor has a field which increases with time.
> > However for short time we cannot guarantee the time stamp is always
> > increases. So at the boundary ie after 24 hrs we can get records which
> are
> > beyond that windows retention period.
> >
> > Then it happens like it is mentioned above and our aggregation fails.
> >
> > So just to sum up when we get record
> > 24h + 1 sec (it deletes older window and since the new record belongs to
> > the new window its gets created)
> > Now when we get next record of 24 hs - 1 sec since older window is
> dropped
> > it does not get aggregated in that bucket.
> >
> > I suggest we have another setting next to until call retain which retains
> > the older windows into next window.
> >
> > I think at stream window boundary level it should use a concept of
> sliding
> > window. So we can define window like
> >
> > TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 *
> 1000l).untill(7
> > * 24 * 3600 * 1000l).retain(900 * 1000l)
> >
> > So after 7 days it retains the data covered by windows in last 15 minutes
> > which rolls over the data in them to next window. This way streams work
> > continuously.
> >
> > Please let us know your thoughts on this.
> >
> > On another side question on this there is a setting:
> >
> > windowstore.changelog.additional.retention.ms
> > I is not clear what is does. Is this the default for until?
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> >> Windows are created on demand, ie, each time a new record arrives and
> >> there is no window yet for it, a new window will get created.
> >>
> >> Windows are accepting data until their retention time (that you can
> >> configure via .until()) passed. Thus, you will have many windows being
> >> open in parallel.
> >>
> >> If you read older data, they will just be put into the corresponding
> >> windows (as long as window retention time did not pass). If a window was
> >> discarded already, a new window with this single (later arriving) record
> >> will get created, the computation will be triggered, you get a result,
> >> and afterwards the window is deleted again (as it's retention time
> >> passed already).
> >>
> >> The retention time is driven by "stream-time", in internal tracked time
> >> that only progressed in forward direction. It gets it value from the
> >> timestamps provided by TimestampExtractor -- thus, per default it will
> >> be event-time.
> >>
> >> -Matthias
> >>
> >> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> >>> I've read this and still have more questions than answers. If my data
> >> skips
> >>> about (timewise) what determines when a given window will start / stop
> >>> accepting new data? What if Im reading data from some time ago?
> >>>
> >>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <
> matthias@confluent.io>
> >>> wrote:
> >>>
> >>>> Please have a look here:
> >>>>
> >>>> http://docs.confluent.io/current/streams/developer-
> >>>> guide.html#windowing-a-stream
> >>>>
> >>>> If you have further question, just follow up :)
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>>>> Ive added the 'until()' clause to some aggregation steps and it's
> >> working
> >>>>> wonders for keeping the size of the state store in useful
> boundaries...
> >>>> But
> >>>>> Im not 100% clear on how it works.
> >>>>>
> >>>>> What is implied by the '.until()' clause? What determines when to
> stop
> >>>>> receiving further data - is it clock time (since the window was
> >> created)?
> >>>>> It seems problematic for it to refer to EventTime as this may bounce
> >> all
> >>>>> over the place. For non-overlapping windows a given record can only
> >> fall
> >>>>> into a single aggregation period - so when would a value get
> discarded?
> >>>>>
> >>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >>>> 1000L).until(10 *
> >>>>> 1000L))'  - but what is this accomplishing?
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Sachin,

There is no reason to have an .until() AND a .retain() -- just increase
the value of .until()

If you have a window of let's say 1h size and you set .until() also to
1h -- you can obviously not process any late arriving data. If you set
until() to 2h is this example, you can process data that is up to 1h
delayed.

So basically, the retention should always be larger than you window size.

The parameter
> windowstore.changelog.additional.retention.ms

is applies to changelog topics that backup window state stores. Those
changelog topics are compacted. However, the used key does encode an
window ID and thus older data can never be cleaned up by compaction.
Therefore, an additional retention time is applied to those topics, too.
Thus, if an old window is not updated for this amount of time, it will
get deleted eventually preventing this topic to grown infinitely.

The value will be determined by until(), i.e., whatever you specify in
.until() will be used to set this parameter.


-Matthias

On 12/12/16 1:07 AM, Sachin Mittal wrote:
> Hi,
> We are facing the exact problem as described by Matthias above.
> We are keeping default until which is 1 day.
> 
> Our record's times tamp extractor has a field which increases with time.
> However for short time we cannot guarantee the time stamp is always
> increases. So at the boundary ie after 24 hrs we can get records which are
> beyond that windows retention period.
> 
> Then it happens like it is mentioned above and our aggregation fails.
> 
> So just to sum up when we get record
> 24h + 1 sec (it deletes older window and since the new record belongs to
> the new window its gets created)
> Now when we get next record of 24 hs - 1 sec since older window is dropped
> it does not get aggregated in that bucket.
> 
> I suggest we have another setting next to until call retain which retains
> the older windows into next window.
> 
> I think at stream window boundary level it should use a concept of sliding
> window. So we can define window like
> 
> TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * 1000l).untill(7
> * 24 * 3600 * 1000l).retain(900 * 1000l)
> 
> So after 7 days it retains the data covered by windows in last 15 minutes
> which rolls over the data in them to next window. This way streams work
> continuously.
> 
> Please let us know your thoughts on this.
> 
> On another side question on this there is a setting:
> 
> windowstore.changelog.additional.retention.ms
> I is not clear what is does. Is this the default for until?
> 
> Thanks
> Sachin
> 
> 
> On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Windows are created on demand, ie, each time a new record arrives and
>> there is no window yet for it, a new window will get created.
>>
>> Windows are accepting data until their retention time (that you can
>> configure via .until()) passed. Thus, you will have many windows being
>> open in parallel.
>>
>> If you read older data, they will just be put into the corresponding
>> windows (as long as window retention time did not pass). If a window was
>> discarded already, a new window with this single (later arriving) record
>> will get created, the computation will be triggered, you get a result,
>> and afterwards the window is deleted again (as it's retention time
>> passed already).
>>
>> The retention time is driven by "stream-time", in internal tracked time
>> that only progressed in forward direction. It gets it value from the
>> timestamps provided by TimestampExtractor -- thus, per default it will
>> be event-time.
>>
>> -Matthias
>>
>> On 12/11/16 3:47 PM, Jon Yeargers wrote:
>>> I've read this and still have more questions than answers. If my data
>> skips
>>> about (timewise) what determines when a given window will start / stop
>>> accepting new data? What if Im reading data from some time ago?
>>>
>>> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <ma...@confluent.io>
>>> wrote:
>>>
>>>> Please have a look here:
>>>>
>>>> http://docs.confluent.io/current/streams/developer-
>>>> guide.html#windowing-a-stream
>>>>
>>>> If you have further question, just follow up :)
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>>>> Ive added the 'until()' clause to some aggregation steps and it's
>> working
>>>>> wonders for keeping the size of the state store in useful boundaries...
>>>> But
>>>>> Im not 100% clear on how it works.
>>>>>
>>>>> What is implied by the '.until()' clause? What determines when to stop
>>>>> receiving further data - is it clock time (since the window was
>> created)?
>>>>> It seems problematic for it to refer to EventTime as this may bounce
>> all
>>>>> over the place. For non-overlapping windows a given record can only
>> fall
>>>>> into a single aggregation period - so when would a value get discarded?
>>>>>
>>>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>>>> 1000L).until(10 *
>>>>> 1000L))'  - but what is this accomplishing?
>>>>>
>>>>
>>>>
>>>
>>
>>
> 


Re: How does 'TimeWindows.of().until()' work?

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
We are facing the exact problem as described by Matthias above.
We are keeping default until which is 1 day.

Our record's times tamp extractor has a field which increases with time.
However for short time we cannot guarantee the time stamp is always
increases. So at the boundary ie after 24 hrs we can get records which are
beyond that windows retention period.

Then it happens like it is mentioned above and our aggregation fails.

So just to sum up when we get record
24h + 1 sec (it deletes older window and since the new record belongs to
the new window its gets created)
Now when we get next record of 24 hs - 1 sec since older window is dropped
it does not get aggregated in that bucket.

I suggest we have another setting next to until call retain which retains
the older windows into next window.

I think at stream window boundary level it should use a concept of sliding
window. So we can define window like

TimeWindows.of("test-table", 3600 * 1000l).advanceBy(1800 * 1000l).untill(7
* 24 * 3600 * 1000l).retain(900 * 1000l)

So after 7 days it retains the data covered by windows in last 15 minutes
which rolls over the data in them to next window. This way streams work
continuously.

Please let us know your thoughts on this.

On another side question on this there is a setting:

windowstore.changelog.additional.retention.ms
I is not clear what is does. Is this the default for until?

Thanks
Sachin


On Mon, Dec 12, 2016 at 10:17 AM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Windows are created on demand, ie, each time a new record arrives and
> there is no window yet for it, a new window will get created.
>
> Windows are accepting data until their retention time (that you can
> configure via .until()) passed. Thus, you will have many windows being
> open in parallel.
>
> If you read older data, they will just be put into the corresponding
> windows (as long as window retention time did not pass). If a window was
> discarded already, a new window with this single (later arriving) record
> will get created, the computation will be triggered, you get a result,
> and afterwards the window is deleted again (as it's retention time
> passed already).
>
> The retention time is driven by "stream-time", in internal tracked time
> that only progressed in forward direction. It gets it value from the
> timestamps provided by TimestampExtractor -- thus, per default it will
> be event-time.
>
> -Matthias
>
> On 12/11/16 3:47 PM, Jon Yeargers wrote:
> > I've read this and still have more questions than answers. If my data
> skips
> > about (timewise) what determines when a given window will start / stop
> > accepting new data? What if Im reading data from some time ago?
> >
> > On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> >
> >> Please have a look here:
> >>
> >> http://docs.confluent.io/current/streams/developer-
> >> guide.html#windowing-a-stream
> >>
> >> If you have further question, just follow up :)
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> >>> Ive added the 'until()' clause to some aggregation steps and it's
> working
> >>> wonders for keeping the size of the state store in useful boundaries...
> >> But
> >>> Im not 100% clear on how it works.
> >>>
> >>> What is implied by the '.until()' clause? What determines when to stop
> >>> receiving further data - is it clock time (since the window was
> created)?
> >>> It seems problematic for it to refer to EventTime as this may bounce
> all
> >>> over the place. For non-overlapping windows a given record can only
> fall
> >>> into a single aggregation period - so when would a value get discarded?
> >>>
> >>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> >> 1000L).until(10 *
> >>> 1000L))'  - but what is this accomplishing?
> >>>
> >>
> >>
> >
>
>

Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Windows are created on demand, ie, each time a new record arrives and
there is no window yet for it, a new window will get created.

Windows are accepting data until their retention time (that you can
configure via .until()) passed. Thus, you will have many windows being
open in parallel.

If you read older data, they will just be put into the corresponding
windows (as long as window retention time did not pass). If a window was
discarded already, a new window with this single (later arriving) record
will get created, the computation will be triggered, you get a result,
and afterwards the window is deleted again (as it's retention time
passed already).

The retention time is driven by "stream-time", in internal tracked time
that only progressed in forward direction. It gets it value from the
timestamps provided by TimestampExtractor -- thus, per default it will
be event-time.

-Matthias

On 12/11/16 3:47 PM, Jon Yeargers wrote:
> I've read this and still have more questions than answers. If my data skips
> about (timewise) what determines when a given window will start / stop
> accepting new data? What if Im reading data from some time ago?
> 
> On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Please have a look here:
>>
>> http://docs.confluent.io/current/streams/developer-
>> guide.html#windowing-a-stream
>>
>> If you have further question, just follow up :)
>>
>>
>> -Matthias
>>
>>
>> On 12/10/16 6:11 PM, Jon Yeargers wrote:
>>> Ive added the 'until()' clause to some aggregation steps and it's working
>>> wonders for keeping the size of the state store in useful boundaries...
>> But
>>> Im not 100% clear on how it works.
>>>
>>> What is implied by the '.until()' clause? What determines when to stop
>>> receiving further data - is it clock time (since the window was created)?
>>> It seems problematic for it to refer to EventTime as this may bounce all
>>> over the place. For non-overlapping windows a given record can only fall
>>> into a single aggregation period - so when would a value get discarded?
>>>
>>> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
>> 1000L).until(10 *
>>> 1000L))'  - but what is this accomplishing?
>>>
>>
>>
> 


Re: How does 'TimeWindows.of().until()' work?

Posted by Jon Yeargers <jo...@cedexis.com>.
I've read this and still have more questions than answers. If my data skips
about (timewise) what determines when a given window will start / stop
accepting new data? What if Im reading data from some time ago?

On Sun, Dec 11, 2016 at 2:22 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Please have a look here:
>
> http://docs.confluent.io/current/streams/developer-
> guide.html#windowing-a-stream
>
> If you have further question, just follow up :)
>
>
> -Matthias
>
>
> On 12/10/16 6:11 PM, Jon Yeargers wrote:
> > Ive added the 'until()' clause to some aggregation steps and it's working
> > wonders for keeping the size of the state store in useful boundaries...
> But
> > Im not 100% clear on how it works.
> >
> > What is implied by the '.until()' clause? What determines when to stop
> > receiving further data - is it clock time (since the window was created)?
> > It seems problematic for it to refer to EventTime as this may bounce all
> > over the place. For non-overlapping windows a given record can only fall
> > into a single aggregation period - so when would a value get discarded?
> >
> > Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 *
> 1000L).until(10 *
> > 1000L))'  - but what is this accomplishing?
> >
>
>

Re: How does 'TimeWindows.of().until()' work?

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Please have a look here:

http://docs.confluent.io/current/streams/developer-guide.html#windowing-a-stream

If you have further question, just follow up :)


-Matthias


On 12/10/16 6:11 PM, Jon Yeargers wrote:
> Ive added the 'until()' clause to some aggregation steps and it's working
> wonders for keeping the size of the state store in useful boundaries... But
> Im not 100% clear on how it works.
> 
> What is implied by the '.until()' clause? What determines when to stop
> receiving further data - is it clock time (since the window was created)?
> It seems problematic for it to refer to EventTime as this may bounce all
> over the place. For non-overlapping windows a given record can only fall
> into a single aggregation period - so when would a value get discarded?
> 
> Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * 1000L).until(10 *
> 1000L))'  - but what is this accomplishing?
>