You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Srikanth <sr...@gmail.com> on 2016/07/07 14:57:26 UTC

Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

Eno,

I was also looking for something similar. To output aggregate value once
the window is "complete".
I'm not sure getting individual update for an aggregate operator is that
useful.

With KIP-67, will we have access to Windowed[key]( key + timestamp) and
value?
Does until() clear this store when time passes?

Srikanth

On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox <cl...@yahoo.co.uk.invalid>
wrote:

> Hi Eno,
> I've looked at KIP-67. It looks good but its not clear what calls I would
> make to do what I presently need: Get access to each windowed store at some
> time soon after window end time. I can then use the methods specified to
> iterate over keys and values. Can you point me to the relevant
> method/technique for this?
>
> Thanks,
> Clive
>
>
>     On Tuesday, 28 June 2016, 12:47, Eno Thereska <en...@gmail.com>
> wrote:
>
>
>  Hi Clive,
>
> As promised, here is the link to the KIP that just went out today.
> Feedback welcome:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> >
>
> Thanks
> Eno
>
> > On 27 Jun 2016, at 20:56, Eno Thereska <en...@gmail.com> wrote:
> >
> > Hi Clive,
> >
> > We are working on exposing the state store behind a KTable as part of
> allowing for queries to the structures currently hidden behind the language
> (DSL). The KIP should be out today or tomorrow for you to have a look. You
> can probably do what you need using the low-level processor API but then
> you'd lose the benefits of the DSL and would have to maintain your own
> structures.
> >
> > Thanks,
> > Eno
> >
> >> On 26 Jun 2016, at 18:42, Clive Cox <cl...@yahoo.co.uk.INVALID>
> wrote:
> >>
> >> Following on from this thread, if I want to iterate over a KTable at
> the end of its hopping/tumbling Time Window how can I do this at present
> myself? Is there a way to access these structures?
> >> If this is not possible it would seem I need to duplicate and manage
> something similar to a list of windowed KTables myself which is not really
> ideal.
> >> Thanks for any help,
> >> Clive
> >>
> >>
> >>  On Monday, 13 June 2016, 16:03, Eno Thereska <en...@gmail.com>
> wrote:
> >>
> >>
> >> Hi Clive,
> >>
> >> For now this optimisation is not present. We're working on it as part
> of KIP-63. One manual work-around might be to use a simple Key-value store
> to deduplicate the final output before sending to the backend. It could
> have a simple policy like "output all values at 1 second intervals" or
> "output after 10 records have been received".
> >>
> >> Eno
> >>
> >>
> >>> On 13 Jun 2016, at 13:36, Clive Cox <cl...@yahoo.co.uk.INVALID>
> wrote:
> >>>
> >>>
> >>> Thanks Eno for your comments and references.
> >>> Perhaps, I can explain what I want to achieve and maybe you can
> suggest the correct topology?
> >>> I want process a stream of events and do aggregation and send to an
> analytics backend (Influxdb), so that rather than sending 1000 points/sec
> to the analytics backend, I send a much lower value. I'm only interested in
> using the processing time of the event so in that respect there are no
> "late arriving" events.I was hoping I could use a Tumbling window which
> when its end-time had been passed I can send the consolidated aggregation
> for that window and then throw the Window away.
> >>>
> >>> It sounds like from the references you give that this is not possible
> at present in Kafka Streams?
> >>>
> >>> Thanks,
> >>> Clive
> >>>
> >>>    On Monday, 13 June 2016, 11:32, Eno Thereska <
> eno.thereska@gmail.com> wrote:
> >>>
> >>>
> >>> Hi Clive,
> >>>
> >>> The behaviour you are seeing is indeed correct (though not necessarily
> optimal in terms of performance as described in this JIRA:
> https://issues.apache.org/jira/browse/KAFKA-3101 <
> https://issues.apache.org/jira/browse/KAFKA-3101>)
> >>>
> >>> The key observation is that windows never close/complete. There could
> always be late arriving events that appear long after a window's end
> interval and those need to be accounted for properly. In Kafka Streams that
> means that such late arriving events continue to update the value of the
> window. As described in the above JIRA, some optimisations could still be
> possible (e.g., batch requests as described in KIP-63 <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams>),
> however they are not implemented yet.
> >>>
> >>> So your code needs to handle each update.
> >>>
> >>> Thanks
> >>> Eno
> >>>
> >>>
> >>>
> >>>> On 13 Jun 2016, at 11:13, Clive Cox <cl...@yahoo.co.uk.INVALID>
> wrote:
> >>>>
> >>>> Hi,
> >>>>  I would like to process a stream with a tumbling window of 5secs,
> create aggregated stats for keys and push the final aggregates at the end
> of each window period to a analytics backend. I have tried doing something
> like:
> >>>>  stream
> >>>>        .map
> >>>>        .reduceByKey(...
> >>>>          , TimeWindows.of("mywindow", 5000L),...)
> >>>>        .foreach        {            send stats
> >>>>          }
> >>>> But I get every update to the ktable in the foreach.
> >>>> How do I just get the final values once the TumblingWindow is
> complete so I can iterate over them and send to some external system?
> >>>> Thanks,
> >>>>  Clive
> >>>> PS Using kafka_2.10-0.10.0.0
> >>>>
> >>>
> >>>
> >>
> >>
> >
>
>
>
>

Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

Posted by Srikanth <sr...@gmail.com>.
You are right. We are more likely to be interested in value when window
expires and sometimes when retention limit is reached.
I lost my "time sense" when I read the last email!

I guess we can query a 12:00:00 window at 12:00:05(5 sec window).
That will be some sort of poll(loop) as opposed to a callback that triggers
and at the right moment.
May be a Flink style trigger interface will help too.

Eager to see how it works in practice when released.

Srikanth



On Thu, Jul 14, 2016 at 12:36 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Srikanth,
>
> In you do not care about the intermediate results but only want to query
> the results when the window is no longer retained, you can consider just
> querying the state stores at the time that the window is about to "expire"
> (i.e. it will no longer be retained). For example, with a tumbling window
> size 5 seconds, and retention period say 1 hour, you can query a window
> that is created at time 12:00:00 at around 12:59:00, and query window that
> is created at time 1200:05 at around 12:59:05, etc.
>
> But bare in mind that in practice you may not only be interested in
> querying the aggregated results at the time the window is "dropped",
> because window retention period is for handling possible late arrived data
> and hence it could be much later compared with the window length itself.
> For example in your case the tumbling window length is 5 seconds, but you
> may want to maintain each window for an hour just for late data, and only
> querying a windowed result for a 5-second window that is one hour ago may
> not really be useful in real-time applications. So I think in most cases
> you'd still want to get some results much earlier, say when the window
> length as expired.
>
> Guozhang
>
>
> On Thu, Jul 14, 2016 at 8:08 AM, Srikanth <sr...@gmail.com> wrote:
>
> > Michael,
> >
> >  > This allows Kafka Streams to retain old window buckets for a period of
> > time in order to wait for the late
> > arrival of records whose timestamps fall within the window interval. If a
> > record arrives
> > after the retention period has passed, the record cannot be processed and
> > is dropped.
> >
> > So, how do we know if a window is being closed when until() is reached?
> > Do we get a callback of some sort when this happens? So that we can query
> > the final result for a window.
> >
> > Otherwise, we have to keep reading the store say very n secs or m
> records.
> > These intermediate results may or may not be useful.
> > That's a lot of load but we still may miss the final computed value,
> which
> > is needed in most cases.
> >
> > Srikanth
> >
> > On Thu, Jul 14, 2016 at 5:04 AM, Michael Noll <mi...@confluent.io>
> > wrote:
> >
> > > Srikanth,
> > >
> > > > This would be useful in place where we use a key-value store just to
> > > > duplicate a KTable for get() operations.
> > > > Any rough idea when this is targeted for release?
> > >
> > > We are aiming to add the queryable state feature into the next release
> of
> > > Kafka.
> > >
> > >
> > > > Its still not clear how to use this for the case this thread was
> > started
> > > for.
> > > > Does Kafka Stream keep windows alive forever?
> > > > At some point we need to "complete" a window rt?
> > >
> > > Kafka Streams keeps windows alive until the so-called window retention
> > > period expires.
> > >
> > > Excerpt from
> > >
> > >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#windowing-a-stream
> > > :
> > >
> > >     [For the DSL only]: A local state store is usually needed for a
> > > windowing operation
> > >     to store recently received records based on the window interval,
> > while
> > > old records
> > >     in the store are purged after the specified window retention
> period.
> > > The retention time
> > >     can be set via `Windows#until()`.
> > >
> > > Excerpt from
> > >
> > >
> >
> http://docs.confluent.io/3.0.0/streams/concepts.html#streams-concepts-windowing
> > > :
> > >
> > >     Windowing operations are available in the Kafka Streams DSL, where
> > > users can
> > >     specify a retention period for the window. This allows Kafka
> Streams
> > to
> > > retain
> > >     old window buckets for a period of time in order to wait for the
> late
> > > arrival of records
> > >     whose timestamps fall within the window interval. If a record
> arrives
> > > after the retention
> > >     period has passed, the record cannot be processed and is dropped.
> > >
> > >
> > > > Either based on processing time or event time + watermark, etc.
> > >
> > > The time semantics are based on the timestamp extractor you have
> > configured
> > > for your application.  The default timestamp extractor is
> > > `ConsumerRecordTimestampExtractor`, which yields event-time semantics.
> > If
> > > you want processing-time semantics, you need to configure your
> > application
> > > to use the `WallclockTimestampExtractor`.
> > >
> > > Hope this helps,
> > > Michael
> > >
> > >
> > >
> > >
> > > On Wed, Jul 13, 2016 at 8:19 PM, Srikanth <sr...@gmail.com>
> wrote:
> > >
> > > > Thanks.
> > > >
> > > > This would be useful in place where we use a key-value store just to
> > > > duplicate a KTable for get() operations.
> > > > Any rough idea when this is targeted for release?
> > > >
> > > > Its still not clear how to use this for the case this thread was
> > started
> > > > for.
> > > > Does Kafka Stream keep windows alive forever?
> > > > At some point we need to "complete" a window rt? Either based on
> > > processing
> > > > time or event time + watermark, etc.
> > > > How can we tie internal state store query with window completion?
> i.e,
> > > get
> > > > the final value.
> > > >
> > > > Srikanth
> > > >
> > > > On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska <eno.thereska@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Srikanth, Clive,
> > > > >
> > > > > Today we just added some example code usage in the KIP after
> feedback
> > > > from
> > > > > the community. There is code that shows how to access a WindowStore
> > (in
> > > > > read-only mode).
> > > > >
> > > > > Thanks
> > > > > Eno
> > > > >
> > > > >
> > > > > > On 7 Jul 2016, at 15:57, Srikanth <sr...@gmail.com> wrote:
> > > > > >
> > > > > > Eno,
> > > > > >
> > > > > > I was also looking for something similar. To output aggregate
> value
> > > > once
> > > > > > the window is "complete".
> > > > > > I'm not sure getting individual update for an aggregate operator
> is
> > > > that
> > > > > > useful.
> > > > > >
> > > > > > With KIP-67, will we have access to Windowed[key]( key +
> timestamp)
> > > and
> > > > > > value?
> > > > > > Does until() clear this store when time passes?
> > > > > >
> > > > > > Srikanth
> > > > > >
> > > > > > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox
> > > > <clivejcox@yahoo.co.uk.invalid
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Eno,
> > > > > >> I've looked at KIP-67. It looks good but its not clear what
> calls
> > I
> > > > > would
> > > > > >> make to do what I presently need: Get access to each windowed
> > store
> > > at
> > > > > some
> > > > > >> time soon after window end time. I can then use the methods
> > > specified
> > > > to
> > > > > >> iterate over keys and values. Can you point me to the relevant
> > > > > >> method/technique for this?
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Clive
> > > > > >>
> > > > > >>
> > > > > >>    On Tuesday, 28 June 2016, 12:47, Eno Thereska <
> > > > > eno.thereska@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >>
> > > > > >> Hi Clive,
> > > > > >>
> > > > > >> As promised, here is the link to the KIP that just went out
> today.
> > > > > >> Feedback welcome:
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > > > >> <
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> > > > > >>>
> > > > > >>
> > > > > >> Thanks
> > > > > >> Eno
> > > > > >>
> > > > > >>> On 27 Jun 2016, at 20:56, Eno Thereska <eno.thereska@gmail.com
> >
> > > > wrote:
> > > > > >>>
> > > > > >>> Hi Clive,
> > > > > >>>
> > > > > >>> We are working on exposing the state store behind a KTable as
> > part
> > > of
> > > > > >> allowing for queries to the structures currently hidden behind
> the
> > > > > language
> > > > > >> (DSL). The KIP should be out today or tomorrow for you to have a
> > > look.
> > > > > You
> > > > > >> can probably do what you need using the low-level processor API
> > but
> > > > then
> > > > > >> you'd lose the benefits of the DSL and would have to maintain
> your
> > > own
> > > > > >> structures.
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Eno
> > > > > >>>
> > > > > >>>> On 26 Jun 2016, at 18:42, Clive Cox
> > <clivejcox@yahoo.co.uk.INVALID
> > > >
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>> Following on from this thread, if I want to iterate over a
> > KTable
> > > at
> > > > > >> the end of its hopping/tumbling Time Window how can I do this at
> > > > present
> > > > > >> myself? Is there a way to access these structures?
> > > > > >>>> If this is not possible it would seem I need to duplicate and
> > > manage
> > > > > >> something similar to a list of windowed KTables myself which is
> > not
> > > > > really
> > > > > >> ideal.
> > > > > >>>> Thanks for any help,
> > > > > >>>> Clive
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Monday, 13 June 2016, 16:03, Eno Thereska <
> > > > eno.thereska@gmail.com>
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Hi Clive,
> > > > > >>>>
> > > > > >>>> For now this optimisation is not present. We're working on it
> as
> > > > part
> > > > > >> of KIP-63. One manual work-around might be to use a simple
> > Key-value
> > > > > store
> > > > > >> to deduplicate the final output before sending to the backend.
> It
> > > > could
> > > > > >> have a simple policy like "output all values at 1 second
> > intervals"
> > > or
> > > > > >> "output after 10 records have been received".
> > > > > >>>>
> > > > > >>>> Eno
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>> On 13 Jun 2016, at 13:36, Clive Cox
> > > <clivejcox@yahoo.co.uk.INVALID
> > > > >
> > > > > >> wrote:
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Thanks Eno for your comments and references.
> > > > > >>>>> Perhaps, I can explain what I want to achieve and maybe you
> can
> > > > > >> suggest the correct topology?
> > > > > >>>>> I want process a stream of events and do aggregation and send
> > to
> > > an
> > > > > >> analytics backend (Influxdb), so that rather than sending 1000
> > > > > points/sec
> > > > > >> to the analytics backend, I send a much lower value. I'm only
> > > > > interested in
> > > > > >> using the processing time of the event so in that respect there
> > are
> > > no
> > > > > >> "late arriving" events.I was hoping I could use a Tumbling
> window
> > > > which
> > > > > >> when its end-time had been passed I can send the consolidated
> > > > > aggregation
> > > > > >> for that window and then throw the Window away.
> > > > > >>>>>
> > > > > >>>>> It sounds like from the references you give that this is not
> > > > possible
> > > > > >> at present in Kafka Streams?
> > > > > >>>>>
> > > > > >>>>> Thanks,
> > > > > >>>>> Clive
> > > > > >>>>>
> > > > > >>>>>   On Monday, 13 June 2016, 11:32, Eno Thereska <
> > > > > >> eno.thereska@gmail.com> wrote:
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>> Hi Clive,
> > > > > >>>>>
> > > > > >>>>> The behaviour you are seeing is indeed correct (though not
> > > > > necessarily
> > > > > >> optimal in terms of performance as described in this JIRA:
> > > > > >> https://issues.apache.org/jira/browse/KAFKA-3101 <
> > > > > >> https://issues.apache.org/jira/browse/KAFKA-3101>)
> > > > > >>>>>
> > > > > >>>>> The key observation is that windows never close/complete.
> There
> > > > could
> > > > > >> always be late arriving events that appear long after a window's
> > end
> > > > > >> interval and those need to be accounted for properly. In Kafka
> > > Streams
> > > > > that
> > > > > >> means that such late arriving events continue to update the
> value
> > of
> > > > the
> > > > > >> window. As described in the above JIRA, some optimisations could
> > > still
> > > > > be
> > > > > >> possible (e.g., batch requests as described in KIP-63 <
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > > > > >),
> > > > > >> however they are not implemented yet.
> > > > > >>>>>
> > > > > >>>>> So your code needs to handle each update.
> > > > > >>>>>
> > > > > >>>>> Thanks
> > > > > >>>>> Eno
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>>> On 13 Jun 2016, at 11:13, Clive Cox
> > > <clivejcox@yahoo.co.uk.INVALID
> > > > >
> > > > > >> wrote:
> > > > > >>>>>>
> > > > > >>>>>> Hi,
> > > > > >>>>>> I would like to process a stream with a tumbling window of
> > > 5secs,
> > > > > >> create aggregated stats for keys and push the final aggregates
> at
> > > the
> > > > > end
> > > > > >> of each window period to a analytics backend. I have tried doing
> > > > > something
> > > > > >> like:
> > > > > >>>>>> stream
> > > > > >>>>>>       .map
> > > > > >>>>>>       .reduceByKey(...
> > > > > >>>>>>         , TimeWindows.of("mywindow", 5000L),...)
> > > > > >>>>>>       .foreach        {            send stats
> > > > > >>>>>>         }
> > > > > >>>>>> But I get every update to the ktable in the foreach.
> > > > > >>>>>> How do I just get the final values once the TumblingWindow
> is
> > > > > >> complete so I can iterate over them and send to some external
> > > system?
> > > > > >>>>>> Thanks,
> > > > > >>>>>> Clive
> > > > > >>>>>> PS Using kafka_2.10-0.10.0.0
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >>
> > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > >
> > > *Michael G. Noll | Product Manager | Confluent | +1
> 650.453.5860Download
> > > Apache Kafka and Confluent Platform: www.confluent.io/download
> > > <http://www.confluent.io/download>*
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Srikanth,

In you do not care about the intermediate results but only want to query
the results when the window is no longer retained, you can consider just
querying the state stores at the time that the window is about to "expire"
(i.e. it will no longer be retained). For example, with a tumbling window
size 5 seconds, and retention period say 1 hour, you can query a window
that is created at time 12:00:00 at around 12:59:00, and query window that
is created at time 1200:05 at around 12:59:05, etc.

But bare in mind that in practice you may not only be interested in
querying the aggregated results at the time the window is "dropped",
because window retention period is for handling possible late arrived data
and hence it could be much later compared with the window length itself.
For example in your case the tumbling window length is 5 seconds, but you
may want to maintain each window for an hour just for late data, and only
querying a windowed result for a 5-second window that is one hour ago may
not really be useful in real-time applications. So I think in most cases
you'd still want to get some results much earlier, say when the window
length as expired.

Guozhang


On Thu, Jul 14, 2016 at 8:08 AM, Srikanth <sr...@gmail.com> wrote:

> Michael,
>
>  > This allows Kafka Streams to retain old window buckets for a period of
> time in order to wait for the late
> arrival of records whose timestamps fall within the window interval. If a
> record arrives
> after the retention period has passed, the record cannot be processed and
> is dropped.
>
> So, how do we know if a window is being closed when until() is reached?
> Do we get a callback of some sort when this happens? So that we can query
> the final result for a window.
>
> Otherwise, we have to keep reading the store say very n secs or m records.
> These intermediate results may or may not be useful.
> That's a lot of load but we still may miss the final computed value, which
> is needed in most cases.
>
> Srikanth
>
> On Thu, Jul 14, 2016 at 5:04 AM, Michael Noll <mi...@confluent.io>
> wrote:
>
> > Srikanth,
> >
> > > This would be useful in place where we use a key-value store just to
> > > duplicate a KTable for get() operations.
> > > Any rough idea when this is targeted for release?
> >
> > We are aiming to add the queryable state feature into the next release of
> > Kafka.
> >
> >
> > > Its still not clear how to use this for the case this thread was
> started
> > for.
> > > Does Kafka Stream keep windows alive forever?
> > > At some point we need to "complete" a window rt?
> >
> > Kafka Streams keeps windows alive until the so-called window retention
> > period expires.
> >
> > Excerpt from
> >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#windowing-a-stream
> > :
> >
> >     [For the DSL only]: A local state store is usually needed for a
> > windowing operation
> >     to store recently received records based on the window interval,
> while
> > old records
> >     in the store are purged after the specified window retention period.
> > The retention time
> >     can be set via `Windows#until()`.
> >
> > Excerpt from
> >
> >
> http://docs.confluent.io/3.0.0/streams/concepts.html#streams-concepts-windowing
> > :
> >
> >     Windowing operations are available in the Kafka Streams DSL, where
> > users can
> >     specify a retention period for the window. This allows Kafka Streams
> to
> > retain
> >     old window buckets for a period of time in order to wait for the late
> > arrival of records
> >     whose timestamps fall within the window interval. If a record arrives
> > after the retention
> >     period has passed, the record cannot be processed and is dropped.
> >
> >
> > > Either based on processing time or event time + watermark, etc.
> >
> > The time semantics are based on the timestamp extractor you have
> configured
> > for your application.  The default timestamp extractor is
> > `ConsumerRecordTimestampExtractor`, which yields event-time semantics.
> If
> > you want processing-time semantics, you need to configure your
> application
> > to use the `WallclockTimestampExtractor`.
> >
> > Hope this helps,
> > Michael
> >
> >
> >
> >
> > On Wed, Jul 13, 2016 at 8:19 PM, Srikanth <sr...@gmail.com> wrote:
> >
> > > Thanks.
> > >
> > > This would be useful in place where we use a key-value store just to
> > > duplicate a KTable for get() operations.
> > > Any rough idea when this is targeted for release?
> > >
> > > Its still not clear how to use this for the case this thread was
> started
> > > for.
> > > Does Kafka Stream keep windows alive forever?
> > > At some point we need to "complete" a window rt? Either based on
> > processing
> > > time or event time + watermark, etc.
> > > How can we tie internal state store query with window completion? i.e,
> > get
> > > the final value.
> > >
> > > Srikanth
> > >
> > > On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska <en...@gmail.com>
> > > wrote:
> > >
> > > > Hi Srikanth, Clive,
> > > >
> > > > Today we just added some example code usage in the KIP after feedback
> > > from
> > > > the community. There is code that shows how to access a WindowStore
> (in
> > > > read-only mode).
> > > >
> > > > Thanks
> > > > Eno
> > > >
> > > >
> > > > > On 7 Jul 2016, at 15:57, Srikanth <sr...@gmail.com> wrote:
> > > > >
> > > > > Eno,
> > > > >
> > > > > I was also looking for something similar. To output aggregate value
> > > once
> > > > > the window is "complete".
> > > > > I'm not sure getting individual update for an aggregate operator is
> > > that
> > > > > useful.
> > > > >
> > > > > With KIP-67, will we have access to Windowed[key]( key + timestamp)
> > and
> > > > > value?
> > > > > Does until() clear this store when time passes?
> > > > >
> > > > > Srikanth
> > > > >
> > > > > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox
> > > <clivejcox@yahoo.co.uk.invalid
> > > > >
> > > > > wrote:
> > > > >
> > > > >> Hi Eno,
> > > > >> I've looked at KIP-67. It looks good but its not clear what calls
> I
> > > > would
> > > > >> make to do what I presently need: Get access to each windowed
> store
> > at
> > > > some
> > > > >> time soon after window end time. I can then use the methods
> > specified
> > > to
> > > > >> iterate over keys and values. Can you point me to the relevant
> > > > >> method/technique for this?
> > > > >>
> > > > >> Thanks,
> > > > >> Clive
> > > > >>
> > > > >>
> > > > >>    On Tuesday, 28 June 2016, 12:47, Eno Thereska <
> > > > eno.thereska@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>
> > > > >> Hi Clive,
> > > > >>
> > > > >> As promised, here is the link to the KIP that just went out today.
> > > > >> Feedback welcome:
> > > > >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > > >> <
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> > > > >>>
> > > > >>
> > > > >> Thanks
> > > > >> Eno
> > > > >>
> > > > >>> On 27 Jun 2016, at 20:56, Eno Thereska <en...@gmail.com>
> > > wrote:
> > > > >>>
> > > > >>> Hi Clive,
> > > > >>>
> > > > >>> We are working on exposing the state store behind a KTable as
> part
> > of
> > > > >> allowing for queries to the structures currently hidden behind the
> > > > language
> > > > >> (DSL). The KIP should be out today or tomorrow for you to have a
> > look.
> > > > You
> > > > >> can probably do what you need using the low-level processor API
> but
> > > then
> > > > >> you'd lose the benefits of the DSL and would have to maintain your
> > own
> > > > >> structures.
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Eno
> > > > >>>
> > > > >>>> On 26 Jun 2016, at 18:42, Clive Cox
> <clivejcox@yahoo.co.uk.INVALID
> > >
> > > > >> wrote:
> > > > >>>>
> > > > >>>> Following on from this thread, if I want to iterate over a
> KTable
> > at
> > > > >> the end of its hopping/tumbling Time Window how can I do this at
> > > present
> > > > >> myself? Is there a way to access these structures?
> > > > >>>> If this is not possible it would seem I need to duplicate and
> > manage
> > > > >> something similar to a list of windowed KTables myself which is
> not
> > > > really
> > > > >> ideal.
> > > > >>>> Thanks for any help,
> > > > >>>> Clive
> > > > >>>>
> > > > >>>>
> > > > >>>> On Monday, 13 June 2016, 16:03, Eno Thereska <
> > > eno.thereska@gmail.com>
> > > > >> wrote:
> > > > >>>>
> > > > >>>>
> > > > >>>> Hi Clive,
> > > > >>>>
> > > > >>>> For now this optimisation is not present. We're working on it as
> > > part
> > > > >> of KIP-63. One manual work-around might be to use a simple
> Key-value
> > > > store
> > > > >> to deduplicate the final output before sending to the backend. It
> > > could
> > > > >> have a simple policy like "output all values at 1 second
> intervals"
> > or
> > > > >> "output after 10 records have been received".
> > > > >>>>
> > > > >>>> Eno
> > > > >>>>
> > > > >>>>
> > > > >>>>> On 13 Jun 2016, at 13:36, Clive Cox
> > <clivejcox@yahoo.co.uk.INVALID
> > > >
> > > > >> wrote:
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Thanks Eno for your comments and references.
> > > > >>>>> Perhaps, I can explain what I want to achieve and maybe you can
> > > > >> suggest the correct topology?
> > > > >>>>> I want process a stream of events and do aggregation and send
> to
> > an
> > > > >> analytics backend (Influxdb), so that rather than sending 1000
> > > > points/sec
> > > > >> to the analytics backend, I send a much lower value. I'm only
> > > > interested in
> > > > >> using the processing time of the event so in that respect there
> are
> > no
> > > > >> "late arriving" events.I was hoping I could use a Tumbling window
> > > which
> > > > >> when its end-time had been passed I can send the consolidated
> > > > aggregation
> > > > >> for that window and then throw the Window away.
> > > > >>>>>
> > > > >>>>> It sounds like from the references you give that this is not
> > > possible
> > > > >> at present in Kafka Streams?
> > > > >>>>>
> > > > >>>>> Thanks,
> > > > >>>>> Clive
> > > > >>>>>
> > > > >>>>>   On Monday, 13 June 2016, 11:32, Eno Thereska <
> > > > >> eno.thereska@gmail.com> wrote:
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Hi Clive,
> > > > >>>>>
> > > > >>>>> The behaviour you are seeing is indeed correct (though not
> > > > necessarily
> > > > >> optimal in terms of performance as described in this JIRA:
> > > > >> https://issues.apache.org/jira/browse/KAFKA-3101 <
> > > > >> https://issues.apache.org/jira/browse/KAFKA-3101>)
> > > > >>>>>
> > > > >>>>> The key observation is that windows never close/complete. There
> > > could
> > > > >> always be late arriving events that appear long after a window's
> end
> > > > >> interval and those need to be accounted for properly. In Kafka
> > Streams
> > > > that
> > > > >> means that such late arriving events continue to update the value
> of
> > > the
> > > > >> window. As described in the above JIRA, some optimisations could
> > still
> > > > be
> > > > >> possible (e.g., batch requests as described in KIP-63 <
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > > > >),
> > > > >> however they are not implemented yet.
> > > > >>>>>
> > > > >>>>> So your code needs to handle each update.
> > > > >>>>>
> > > > >>>>> Thanks
> > > > >>>>> Eno
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>> On 13 Jun 2016, at 11:13, Clive Cox
> > <clivejcox@yahoo.co.uk.INVALID
> > > >
> > > > >> wrote:
> > > > >>>>>>
> > > > >>>>>> Hi,
> > > > >>>>>> I would like to process a stream with a tumbling window of
> > 5secs,
> > > > >> create aggregated stats for keys and push the final aggregates at
> > the
> > > > end
> > > > >> of each window period to a analytics backend. I have tried doing
> > > > something
> > > > >> like:
> > > > >>>>>> stream
> > > > >>>>>>       .map
> > > > >>>>>>       .reduceByKey(...
> > > > >>>>>>         , TimeWindows.of("mywindow", 5000L),...)
> > > > >>>>>>       .foreach        {            send stats
> > > > >>>>>>         }
> > > > >>>>>> But I get every update to the ktable in the foreach.
> > > > >>>>>> How do I just get the final values once the TumblingWindow is
> > > > >> complete so I can iterate over them and send to some external
> > system?
> > > > >>>>>> Thanks,
> > > > >>>>>> Clive
> > > > >>>>>> PS Using kafka_2.10-0.10.0.0
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> >
> > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> > Apache Kafka and Confluent Platform: www.confluent.io/download
> > <http://www.confluent.io/download>*
> >
>



-- 
-- Guozhang

Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

Posted by Srikanth <sr...@gmail.com>.
Michael,

 > This allows Kafka Streams to retain old window buckets for a period of
time in order to wait for the late
arrival of records whose timestamps fall within the window interval. If a
record arrives
after the retention period has passed, the record cannot be processed and
is dropped.

So, how do we know if a window is being closed when until() is reached?
Do we get a callback of some sort when this happens? So that we can query
the final result for a window.

Otherwise, we have to keep reading the store say very n secs or m records.
These intermediate results may or may not be useful.
That's a lot of load but we still may miss the final computed value, which
is needed in most cases.

Srikanth

On Thu, Jul 14, 2016 at 5:04 AM, Michael Noll <mi...@confluent.io> wrote:

> Srikanth,
>
> > This would be useful in place where we use a key-value store just to
> > duplicate a KTable for get() operations.
> > Any rough idea when this is targeted for release?
>
> We are aiming to add the queryable state feature into the next release of
> Kafka.
>
>
> > Its still not clear how to use this for the case this thread was started
> for.
> > Does Kafka Stream keep windows alive forever?
> > At some point we need to "complete" a window rt?
>
> Kafka Streams keeps windows alive until the so-called window retention
> period expires.
>
> Excerpt from
>
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#windowing-a-stream
> :
>
>     [For the DSL only]: A local state store is usually needed for a
> windowing operation
>     to store recently received records based on the window interval, while
> old records
>     in the store are purged after the specified window retention period.
> The retention time
>     can be set via `Windows#until()`.
>
> Excerpt from
>
> http://docs.confluent.io/3.0.0/streams/concepts.html#streams-concepts-windowing
> :
>
>     Windowing operations are available in the Kafka Streams DSL, where
> users can
>     specify a retention period for the window. This allows Kafka Streams to
> retain
>     old window buckets for a period of time in order to wait for the late
> arrival of records
>     whose timestamps fall within the window interval. If a record arrives
> after the retention
>     period has passed, the record cannot be processed and is dropped.
>
>
> > Either based on processing time or event time + watermark, etc.
>
> The time semantics are based on the timestamp extractor you have configured
> for your application.  The default timestamp extractor is
> `ConsumerRecordTimestampExtractor`, which yields event-time semantics.  If
> you want processing-time semantics, you need to configure your application
> to use the `WallclockTimestampExtractor`.
>
> Hope this helps,
> Michael
>
>
>
>
> On Wed, Jul 13, 2016 at 8:19 PM, Srikanth <sr...@gmail.com> wrote:
>
> > Thanks.
> >
> > This would be useful in place where we use a key-value store just to
> > duplicate a KTable for get() operations.
> > Any rough idea when this is targeted for release?
> >
> > Its still not clear how to use this for the case this thread was started
> > for.
> > Does Kafka Stream keep windows alive forever?
> > At some point we need to "complete" a window rt? Either based on
> processing
> > time or event time + watermark, etc.
> > How can we tie internal state store query with window completion? i.e,
> get
> > the final value.
> >
> > Srikanth
> >
> > On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> > > Hi Srikanth, Clive,
> > >
> > > Today we just added some example code usage in the KIP after feedback
> > from
> > > the community. There is code that shows how to access a WindowStore (in
> > > read-only mode).
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On 7 Jul 2016, at 15:57, Srikanth <sr...@gmail.com> wrote:
> > > >
> > > > Eno,
> > > >
> > > > I was also looking for something similar. To output aggregate value
> > once
> > > > the window is "complete".
> > > > I'm not sure getting individual update for an aggregate operator is
> > that
> > > > useful.
> > > >
> > > > With KIP-67, will we have access to Windowed[key]( key + timestamp)
> and
> > > > value?
> > > > Does until() clear this store when time passes?
> > > >
> > > > Srikanth
> > > >
> > > > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox
> > <clivejcox@yahoo.co.uk.invalid
> > > >
> > > > wrote:
> > > >
> > > >> Hi Eno,
> > > >> I've looked at KIP-67. It looks good but its not clear what calls I
> > > would
> > > >> make to do what I presently need: Get access to each windowed store
> at
> > > some
> > > >> time soon after window end time. I can then use the methods
> specified
> > to
> > > >> iterate over keys and values. Can you point me to the relevant
> > > >> method/technique for this?
> > > >>
> > > >> Thanks,
> > > >> Clive
> > > >>
> > > >>
> > > >>    On Tuesday, 28 June 2016, 12:47, Eno Thereska <
> > > eno.thereska@gmail.com>
> > > >> wrote:
> > > >>
> > > >>
> > > >> Hi Clive,
> > > >>
> > > >> As promised, here is the link to the KIP that just went out today.
> > > >> Feedback welcome:
> > > >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > >> <
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> > > >>>
> > > >>
> > > >> Thanks
> > > >> Eno
> > > >>
> > > >>> On 27 Jun 2016, at 20:56, Eno Thereska <en...@gmail.com>
> > wrote:
> > > >>>
> > > >>> Hi Clive,
> > > >>>
> > > >>> We are working on exposing the state store behind a KTable as part
> of
> > > >> allowing for queries to the structures currently hidden behind the
> > > language
> > > >> (DSL). The KIP should be out today or tomorrow for you to have a
> look.
> > > You
> > > >> can probably do what you need using the low-level processor API but
> > then
> > > >> you'd lose the benefits of the DSL and would have to maintain your
> own
> > > >> structures.
> > > >>>
> > > >>> Thanks,
> > > >>> Eno
> > > >>>
> > > >>>> On 26 Jun 2016, at 18:42, Clive Cox <clivejcox@yahoo.co.uk.INVALID
> >
> > > >> wrote:
> > > >>>>
> > > >>>> Following on from this thread, if I want to iterate over a KTable
> at
> > > >> the end of its hopping/tumbling Time Window how can I do this at
> > present
> > > >> myself? Is there a way to access these structures?
> > > >>>> If this is not possible it would seem I need to duplicate and
> manage
> > > >> something similar to a list of windowed KTables myself which is not
> > > really
> > > >> ideal.
> > > >>>> Thanks for any help,
> > > >>>> Clive
> > > >>>>
> > > >>>>
> > > >>>> On Monday, 13 June 2016, 16:03, Eno Thereska <
> > eno.thereska@gmail.com>
> > > >> wrote:
> > > >>>>
> > > >>>>
> > > >>>> Hi Clive,
> > > >>>>
> > > >>>> For now this optimisation is not present. We're working on it as
> > part
> > > >> of KIP-63. One manual work-around might be to use a simple Key-value
> > > store
> > > >> to deduplicate the final output before sending to the backend. It
> > could
> > > >> have a simple policy like "output all values at 1 second intervals"
> or
> > > >> "output after 10 records have been received".
> > > >>>>
> > > >>>> Eno
> > > >>>>
> > > >>>>
> > > >>>>> On 13 Jun 2016, at 13:36, Clive Cox
> <clivejcox@yahoo.co.uk.INVALID
> > >
> > > >> wrote:
> > > >>>>>
> > > >>>>>
> > > >>>>> Thanks Eno for your comments and references.
> > > >>>>> Perhaps, I can explain what I want to achieve and maybe you can
> > > >> suggest the correct topology?
> > > >>>>> I want process a stream of events and do aggregation and send to
> an
> > > >> analytics backend (Influxdb), so that rather than sending 1000
> > > points/sec
> > > >> to the analytics backend, I send a much lower value. I'm only
> > > interested in
> > > >> using the processing time of the event so in that respect there are
> no
> > > >> "late arriving" events.I was hoping I could use a Tumbling window
> > which
> > > >> when its end-time had been passed I can send the consolidated
> > > aggregation
> > > >> for that window and then throw the Window away.
> > > >>>>>
> > > >>>>> It sounds like from the references you give that this is not
> > possible
> > > >> at present in Kafka Streams?
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Clive
> > > >>>>>
> > > >>>>>   On Monday, 13 June 2016, 11:32, Eno Thereska <
> > > >> eno.thereska@gmail.com> wrote:
> > > >>>>>
> > > >>>>>
> > > >>>>> Hi Clive,
> > > >>>>>
> > > >>>>> The behaviour you are seeing is indeed correct (though not
> > > necessarily
> > > >> optimal in terms of performance as described in this JIRA:
> > > >> https://issues.apache.org/jira/browse/KAFKA-3101 <
> > > >> https://issues.apache.org/jira/browse/KAFKA-3101>)
> > > >>>>>
> > > >>>>> The key observation is that windows never close/complete. There
> > could
> > > >> always be late arriving events that appear long after a window's end
> > > >> interval and those need to be accounted for properly. In Kafka
> Streams
> > > that
> > > >> means that such late arriving events continue to update the value of
> > the
> > > >> window. As described in the above JIRA, some optimisations could
> still
> > > be
> > > >> possible (e.g., batch requests as described in KIP-63 <
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > > >),
> > > >> however they are not implemented yet.
> > > >>>>>
> > > >>>>> So your code needs to handle each update.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>> Eno
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>> On 13 Jun 2016, at 11:13, Clive Cox
> <clivejcox@yahoo.co.uk.INVALID
> > >
> > > >> wrote:
> > > >>>>>>
> > > >>>>>> Hi,
> > > >>>>>> I would like to process a stream with a tumbling window of
> 5secs,
> > > >> create aggregated stats for keys and push the final aggregates at
> the
> > > end
> > > >> of each window period to a analytics backend. I have tried doing
> > > something
> > > >> like:
> > > >>>>>> stream
> > > >>>>>>       .map
> > > >>>>>>       .reduceByKey(...
> > > >>>>>>         , TimeWindows.of("mywindow", 5000L),...)
> > > >>>>>>       .foreach        {            send stats
> > > >>>>>>         }
> > > >>>>>> But I get every update to the ktable in the foreach.
> > > >>>>>> How do I just get the final values once the TumblingWindow is
> > > >> complete so I can iterate over them and send to some external
> system?
> > > >>>>>> Thanks,
> > > >>>>>> Clive
> > > >>>>>> PS Using kafka_2.10-0.10.0.0
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >>
> > > >>
> > > >>
> > >
> > >
> >
>
>
>
> --
>
> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
> Apache Kafka and Confluent Platform: www.confluent.io/download
> <http://www.confluent.io/download>*
>

Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

Posted by Michael Noll <mi...@confluent.io>.
Srikanth,

> This would be useful in place where we use a key-value store just to
> duplicate a KTable for get() operations.
> Any rough idea when this is targeted for release?

We are aiming to add the queryable state feature into the next release of
Kafka.


> Its still not clear how to use this for the case this thread was started
for.
> Does Kafka Stream keep windows alive forever?
> At some point we need to "complete" a window rt?

Kafka Streams keeps windows alive until the so-called window retention
period expires.

Excerpt from
http://docs.confluent.io/3.0.0/streams/developer-guide.html#windowing-a-stream
:

    [For the DSL only]: A local state store is usually needed for a
windowing operation
    to store recently received records based on the window interval, while
old records
    in the store are purged after the specified window retention period.
The retention time
    can be set via `Windows#until()`.

Excerpt from
http://docs.confluent.io/3.0.0/streams/concepts.html#streams-concepts-windowing
:

    Windowing operations are available in the Kafka Streams DSL, where
users can
    specify a retention period for the window. This allows Kafka Streams to
retain
    old window buckets for a period of time in order to wait for the late
arrival of records
    whose timestamps fall within the window interval. If a record arrives
after the retention
    period has passed, the record cannot be processed and is dropped.


> Either based on processing time or event time + watermark, etc.

The time semantics are based on the timestamp extractor you have configured
for your application.  The default timestamp extractor is
`ConsumerRecordTimestampExtractor`, which yields event-time semantics.  If
you want processing-time semantics, you need to configure your application
to use the `WallclockTimestampExtractor`.

Hope this helps,
Michael




On Wed, Jul 13, 2016 at 8:19 PM, Srikanth <sr...@gmail.com> wrote:

> Thanks.
>
> This would be useful in place where we use a key-value store just to
> duplicate a KTable for get() operations.
> Any rough idea when this is targeted for release?
>
> Its still not clear how to use this for the case this thread was started
> for.
> Does Kafka Stream keep windows alive forever?
> At some point we need to "complete" a window rt? Either based on processing
> time or event time + watermark, etc.
> How can we tie internal state store query with window completion? i.e, get
> the final value.
>
> Srikanth
>
> On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Hi Srikanth, Clive,
> >
> > Today we just added some example code usage in the KIP after feedback
> from
> > the community. There is code that shows how to access a WindowStore (in
> > read-only mode).
> >
> > Thanks
> > Eno
> >
> >
> > > On 7 Jul 2016, at 15:57, Srikanth <sr...@gmail.com> wrote:
> > >
> > > Eno,
> > >
> > > I was also looking for something similar. To output aggregate value
> once
> > > the window is "complete".
> > > I'm not sure getting individual update for an aggregate operator is
> that
> > > useful.
> > >
> > > With KIP-67, will we have access to Windowed[key]( key + timestamp) and
> > > value?
> > > Does until() clear this store when time passes?
> > >
> > > Srikanth
> > >
> > > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox
> <clivejcox@yahoo.co.uk.invalid
> > >
> > > wrote:
> > >
> > >> Hi Eno,
> > >> I've looked at KIP-67. It looks good but its not clear what calls I
> > would
> > >> make to do what I presently need: Get access to each windowed store at
> > some
> > >> time soon after window end time. I can then use the methods specified
> to
> > >> iterate over keys and values. Can you point me to the relevant
> > >> method/technique for this?
> > >>
> > >> Thanks,
> > >> Clive
> > >>
> > >>
> > >>    On Tuesday, 28 June 2016, 12:47, Eno Thereska <
> > eno.thereska@gmail.com>
> > >> wrote:
> > >>
> > >>
> > >> Hi Clive,
> > >>
> > >> As promised, here is the link to the KIP that just went out today.
> > >> Feedback welcome:
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > >> <
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> > >>>
> > >>
> > >> Thanks
> > >> Eno
> > >>
> > >>> On 27 Jun 2016, at 20:56, Eno Thereska <en...@gmail.com>
> wrote:
> > >>>
> > >>> Hi Clive,
> > >>>
> > >>> We are working on exposing the state store behind a KTable as part of
> > >> allowing for queries to the structures currently hidden behind the
> > language
> > >> (DSL). The KIP should be out today or tomorrow for you to have a look.
> > You
> > >> can probably do what you need using the low-level processor API but
> then
> > >> you'd lose the benefits of the DSL and would have to maintain your own
> > >> structures.
> > >>>
> > >>> Thanks,
> > >>> Eno
> > >>>
> > >>>> On 26 Jun 2016, at 18:42, Clive Cox <cl...@yahoo.co.uk.INVALID>
> > >> wrote:
> > >>>>
> > >>>> Following on from this thread, if I want to iterate over a KTable at
> > >> the end of its hopping/tumbling Time Window how can I do this at
> present
> > >> myself? Is there a way to access these structures?
> > >>>> If this is not possible it would seem I need to duplicate and manage
> > >> something similar to a list of windowed KTables myself which is not
> > really
> > >> ideal.
> > >>>> Thanks for any help,
> > >>>> Clive
> > >>>>
> > >>>>
> > >>>> On Monday, 13 June 2016, 16:03, Eno Thereska <
> eno.thereska@gmail.com>
> > >> wrote:
> > >>>>
> > >>>>
> > >>>> Hi Clive,
> > >>>>
> > >>>> For now this optimisation is not present. We're working on it as
> part
> > >> of KIP-63. One manual work-around might be to use a simple Key-value
> > store
> > >> to deduplicate the final output before sending to the backend. It
> could
> > >> have a simple policy like "output all values at 1 second intervals" or
> > >> "output after 10 records have been received".
> > >>>>
> > >>>> Eno
> > >>>>
> > >>>>
> > >>>>> On 13 Jun 2016, at 13:36, Clive Cox <clivejcox@yahoo.co.uk.INVALID
> >
> > >> wrote:
> > >>>>>
> > >>>>>
> > >>>>> Thanks Eno for your comments and references.
> > >>>>> Perhaps, I can explain what I want to achieve and maybe you can
> > >> suggest the correct topology?
> > >>>>> I want process a stream of events and do aggregation and send to an
> > >> analytics backend (Influxdb), so that rather than sending 1000
> > points/sec
> > >> to the analytics backend, I send a much lower value. I'm only
> > interested in
> > >> using the processing time of the event so in that respect there are no
> > >> "late arriving" events.I was hoping I could use a Tumbling window
> which
> > >> when its end-time had been passed I can send the consolidated
> > aggregation
> > >> for that window and then throw the Window away.
> > >>>>>
> > >>>>> It sounds like from the references you give that this is not
> possible
> > >> at present in Kafka Streams?
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Clive
> > >>>>>
> > >>>>>   On Monday, 13 June 2016, 11:32, Eno Thereska <
> > >> eno.thereska@gmail.com> wrote:
> > >>>>>
> > >>>>>
> > >>>>> Hi Clive,
> > >>>>>
> > >>>>> The behaviour you are seeing is indeed correct (though not
> > necessarily
> > >> optimal in terms of performance as described in this JIRA:
> > >> https://issues.apache.org/jira/browse/KAFKA-3101 <
> > >> https://issues.apache.org/jira/browse/KAFKA-3101>)
> > >>>>>
> > >>>>> The key observation is that windows never close/complete. There
> could
> > >> always be late arriving events that appear long after a window's end
> > >> interval and those need to be accounted for properly. In Kafka Streams
> > that
> > >> means that such late arriving events continue to update the value of
> the
> > >> window. As described in the above JIRA, some optimisations could still
> > be
> > >> possible (e.g., batch requests as described in KIP-63 <
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > >),
> > >> however they are not implemented yet.
> > >>>>>
> > >>>>> So your code needs to handle each update.
> > >>>>>
> > >>>>> Thanks
> > >>>>> Eno
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>> On 13 Jun 2016, at 11:13, Clive Cox <clivejcox@yahoo.co.uk.INVALID
> >
> > >> wrote:
> > >>>>>>
> > >>>>>> Hi,
> > >>>>>> I would like to process a stream with a tumbling window of 5secs,
> > >> create aggregated stats for keys and push the final aggregates at the
> > end
> > >> of each window period to a analytics backend. I have tried doing
> > something
> > >> like:
> > >>>>>> stream
> > >>>>>>       .map
> > >>>>>>       .reduceByKey(...
> > >>>>>>         , TimeWindows.of("mywindow", 5000L),...)
> > >>>>>>       .foreach        {            send stats
> > >>>>>>         }
> > >>>>>> But I get every update to the ktable in the foreach.
> > >>>>>> How do I just get the final values once the TumblingWindow is
> > >> complete so I can iterate over them and send to some external system?
> > >>>>>> Thanks,
> > >>>>>> Clive
> > >>>>>> PS Using kafka_2.10-0.10.0.0
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >>
> > >>
> >
> >
>



-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*

Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

Posted by Srikanth <sr...@gmail.com>.
Thanks.

This would be useful in place where we use a key-value store just to
duplicate a KTable for get() operations.
Any rough idea when this is targeted for release?

Its still not clear how to use this for the case this thread was started
for.
Does Kafka Stream keep windows alive forever?
At some point we need to "complete" a window rt? Either based on processing
time or event time + watermark, etc.
How can we tie internal state store query with window completion? i.e, get
the final value.

Srikanth

On Thu, Jul 7, 2016 at 2:05 PM, Eno Thereska <en...@gmail.com> wrote:

> Hi Srikanth, Clive,
>
> Today we just added some example code usage in the KIP after feedback from
> the community. There is code that shows how to access a WindowStore (in
> read-only mode).
>
> Thanks
> Eno
>
>
> > On 7 Jul 2016, at 15:57, Srikanth <sr...@gmail.com> wrote:
> >
> > Eno,
> >
> > I was also looking for something similar. To output aggregate value once
> > the window is "complete".
> > I'm not sure getting individual update for an aggregate operator is that
> > useful.
> >
> > With KIP-67, will we have access to Windowed[key]( key + timestamp) and
> > value?
> > Does until() clear this store when time passes?
> >
> > Srikanth
> >
> > On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox <clivejcox@yahoo.co.uk.invalid
> >
> > wrote:
> >
> >> Hi Eno,
> >> I've looked at KIP-67. It looks good but its not clear what calls I
> would
> >> make to do what I presently need: Get access to each windowed store at
> some
> >> time soon after window end time. I can then use the methods specified to
> >> iterate over keys and values. Can you point me to the relevant
> >> method/technique for this?
> >>
> >> Thanks,
> >> Clive
> >>
> >>
> >>    On Tuesday, 28 June 2016, 12:47, Eno Thereska <
> eno.thereska@gmail.com>
> >> wrote:
> >>
> >>
> >> Hi Clive,
> >>
> >> As promised, here is the link to the KIP that just went out today.
> >> Feedback welcome:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> >> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
> >>>
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 27 Jun 2016, at 20:56, Eno Thereska <en...@gmail.com> wrote:
> >>>
> >>> Hi Clive,
> >>>
> >>> We are working on exposing the state store behind a KTable as part of
> >> allowing for queries to the structures currently hidden behind the
> language
> >> (DSL). The KIP should be out today or tomorrow for you to have a look.
> You
> >> can probably do what you need using the low-level processor API but then
> >> you'd lose the benefits of the DSL and would have to maintain your own
> >> structures.
> >>>
> >>> Thanks,
> >>> Eno
> >>>
> >>>> On 26 Jun 2016, at 18:42, Clive Cox <cl...@yahoo.co.uk.INVALID>
> >> wrote:
> >>>>
> >>>> Following on from this thread, if I want to iterate over a KTable at
> >> the end of its hopping/tumbling Time Window how can I do this at present
> >> myself? Is there a way to access these structures?
> >>>> If this is not possible it would seem I need to duplicate and manage
> >> something similar to a list of windowed KTables myself which is not
> really
> >> ideal.
> >>>> Thanks for any help,
> >>>> Clive
> >>>>
> >>>>
> >>>> On Monday, 13 June 2016, 16:03, Eno Thereska <en...@gmail.com>
> >> wrote:
> >>>>
> >>>>
> >>>> Hi Clive,
> >>>>
> >>>> For now this optimisation is not present. We're working on it as part
> >> of KIP-63. One manual work-around might be to use a simple Key-value
> store
> >> to deduplicate the final output before sending to the backend. It could
> >> have a simple policy like "output all values at 1 second intervals" or
> >> "output after 10 records have been received".
> >>>>
> >>>> Eno
> >>>>
> >>>>
> >>>>> On 13 Jun 2016, at 13:36, Clive Cox <cl...@yahoo.co.uk.INVALID>
> >> wrote:
> >>>>>
> >>>>>
> >>>>> Thanks Eno for your comments and references.
> >>>>> Perhaps, I can explain what I want to achieve and maybe you can
> >> suggest the correct topology?
> >>>>> I want process a stream of events and do aggregation and send to an
> >> analytics backend (Influxdb), so that rather than sending 1000
> points/sec
> >> to the analytics backend, I send a much lower value. I'm only
> interested in
> >> using the processing time of the event so in that respect there are no
> >> "late arriving" events.I was hoping I could use a Tumbling window which
> >> when its end-time had been passed I can send the consolidated
> aggregation
> >> for that window and then throw the Window away.
> >>>>>
> >>>>> It sounds like from the references you give that this is not possible
> >> at present in Kafka Streams?
> >>>>>
> >>>>> Thanks,
> >>>>> Clive
> >>>>>
> >>>>>   On Monday, 13 June 2016, 11:32, Eno Thereska <
> >> eno.thereska@gmail.com> wrote:
> >>>>>
> >>>>>
> >>>>> Hi Clive,
> >>>>>
> >>>>> The behaviour you are seeing is indeed correct (though not
> necessarily
> >> optimal in terms of performance as described in this JIRA:
> >> https://issues.apache.org/jira/browse/KAFKA-3101 <
> >> https://issues.apache.org/jira/browse/KAFKA-3101>)
> >>>>>
> >>>>> The key observation is that windows never close/complete. There could
> >> always be late arriving events that appear long after a window's end
> >> interval and those need to be accounted for properly. In Kafka Streams
> that
> >> means that such late arriving events continue to update the value of the
> >> window. As described in the above JIRA, some optimisations could still
> be
> >> possible (e.g., batch requests as described in KIP-63 <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> >),
> >> however they are not implemented yet.
> >>>>>
> >>>>> So your code needs to handle each update.
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>
> >>>>>
> >>>>>> On 13 Jun 2016, at 11:13, Clive Cox <cl...@yahoo.co.uk.INVALID>
> >> wrote:
> >>>>>>
> >>>>>> Hi,
> >>>>>> I would like to process a stream with a tumbling window of 5secs,
> >> create aggregated stats for keys and push the final aggregates at the
> end
> >> of each window period to a analytics backend. I have tried doing
> something
> >> like:
> >>>>>> stream
> >>>>>>       .map
> >>>>>>       .reduceByKey(...
> >>>>>>         , TimeWindows.of("mywindow", 5000L),...)
> >>>>>>       .foreach        {            send stats
> >>>>>>         }
> >>>>>> But I get every update to the ktable in the foreach.
> >>>>>> How do I just get the final values once the TumblingWindow is
> >> complete so I can iterate over them and send to some external system?
> >>>>>> Thanks,
> >>>>>> Clive
> >>>>>> PS Using kafka_2.10-0.10.0.0
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >>
> >>
>
>

Re: Kafka Streams reducebykey and tumbling window - need final windowed KTable values

Posted by Eno Thereska <en...@gmail.com>.
Hi Srikanth, Clive,

Today we just added some example code usage in the KIP after feedback from the community. There is code that shows how to access a WindowStore (in read-only mode). 

Thanks
Eno


> On 7 Jul 2016, at 15:57, Srikanth <sr...@gmail.com> wrote:
> 
> Eno,
> 
> I was also looking for something similar. To output aggregate value once
> the window is "complete".
> I'm not sure getting individual update for an aggregate operator is that
> useful.
> 
> With KIP-67, will we have access to Windowed[key]( key + timestamp) and
> value?
> Does until() clear this store when time passes?
> 
> Srikanth
> 
> On Thu, Jun 30, 2016 at 4:27 AM, Clive Cox <cl...@yahoo.co.uk.invalid>
> wrote:
> 
>> Hi Eno,
>> I've looked at KIP-67. It looks good but its not clear what calls I would
>> make to do what I presently need: Get access to each windowed store at some
>> time soon after window end time. I can then use the methods specified to
>> iterate over keys and values. Can you point me to the relevant
>> method/technique for this?
>> 
>> Thanks,
>> Clive
>> 
>> 
>>    On Tuesday, 28 June 2016, 12:47, Eno Thereska <en...@gmail.com>
>> wrote:
>> 
>> 
>> Hi Clive,
>> 
>> As promised, here is the link to the KIP that just went out today.
>> Feedback welcome:
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67:+Queryable+state+for+Kafka+Streams
>>> 
>> 
>> Thanks
>> Eno
>> 
>>> On 27 Jun 2016, at 20:56, Eno Thereska <en...@gmail.com> wrote:
>>> 
>>> Hi Clive,
>>> 
>>> We are working on exposing the state store behind a KTable as part of
>> allowing for queries to the structures currently hidden behind the language
>> (DSL). The KIP should be out today or tomorrow for you to have a look. You
>> can probably do what you need using the low-level processor API but then
>> you'd lose the benefits of the DSL and would have to maintain your own
>> structures.
>>> 
>>> Thanks,
>>> Eno
>>> 
>>>> On 26 Jun 2016, at 18:42, Clive Cox <cl...@yahoo.co.uk.INVALID>
>> wrote:
>>>> 
>>>> Following on from this thread, if I want to iterate over a KTable at
>> the end of its hopping/tumbling Time Window how can I do this at present
>> myself? Is there a way to access these structures?
>>>> If this is not possible it would seem I need to duplicate and manage
>> something similar to a list of windowed KTables myself which is not really
>> ideal.
>>>> Thanks for any help,
>>>> Clive
>>>> 
>>>> 
>>>> On Monday, 13 June 2016, 16:03, Eno Thereska <en...@gmail.com>
>> wrote:
>>>> 
>>>> 
>>>> Hi Clive,
>>>> 
>>>> For now this optimisation is not present. We're working on it as part
>> of KIP-63. One manual work-around might be to use a simple Key-value store
>> to deduplicate the final output before sending to the backend. It could
>> have a simple policy like "output all values at 1 second intervals" or
>> "output after 10 records have been received".
>>>> 
>>>> Eno
>>>> 
>>>> 
>>>>> On 13 Jun 2016, at 13:36, Clive Cox <cl...@yahoo.co.uk.INVALID>
>> wrote:
>>>>> 
>>>>> 
>>>>> Thanks Eno for your comments and references.
>>>>> Perhaps, I can explain what I want to achieve and maybe you can
>> suggest the correct topology?
>>>>> I want process a stream of events and do aggregation and send to an
>> analytics backend (Influxdb), so that rather than sending 1000 points/sec
>> to the analytics backend, I send a much lower value. I'm only interested in
>> using the processing time of the event so in that respect there are no
>> "late arriving" events.I was hoping I could use a Tumbling window which
>> when its end-time had been passed I can send the consolidated aggregation
>> for that window and then throw the Window away.
>>>>> 
>>>>> It sounds like from the references you give that this is not possible
>> at present in Kafka Streams?
>>>>> 
>>>>> Thanks,
>>>>> Clive
>>>>> 
>>>>>   On Monday, 13 June 2016, 11:32, Eno Thereska <
>> eno.thereska@gmail.com> wrote:
>>>>> 
>>>>> 
>>>>> Hi Clive,
>>>>> 
>>>>> The behaviour you are seeing is indeed correct (though not necessarily
>> optimal in terms of performance as described in this JIRA:
>> https://issues.apache.org/jira/browse/KAFKA-3101 <
>> https://issues.apache.org/jira/browse/KAFKA-3101>)
>>>>> 
>>>>> The key observation is that windows never close/complete. There could
>> always be late arriving events that appear long after a window's end
>> interval and those need to be accounted for properly. In Kafka Streams that
>> means that such late arriving events continue to update the value of the
>> window. As described in the above JIRA, some optimisations could still be
>> possible (e.g., batch requests as described in KIP-63 <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams>),
>> however they are not implemented yet.
>>>>> 
>>>>> So your code needs to handle each update.
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>> 
>>>>> 
>>>>>> On 13 Jun 2016, at 11:13, Clive Cox <cl...@yahoo.co.uk.INVALID>
>> wrote:
>>>>>> 
>>>>>> Hi,
>>>>>> I would like to process a stream with a tumbling window of 5secs,
>> create aggregated stats for keys and push the final aggregates at the end
>> of each window period to a analytics backend. I have tried doing something
>> like:
>>>>>> stream
>>>>>>       .map
>>>>>>       .reduceByKey(...
>>>>>>         , TimeWindows.of("mywindow", 5000L),...)
>>>>>>       .foreach        {            send stats
>>>>>>         }
>>>>>> But I get every update to the ktable in the foreach.
>>>>>> How do I just get the final values once the TumblingWindow is
>> complete so I can iterate over them and send to some external system?
>>>>>> Thanks,
>>>>>> Clive
>>>>>> PS Using kafka_2.10-0.10.0.0
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 
>> 
>>