You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Navneeth Krishnan <re...@gmail.com> on 2021/02/15 08:07:56 UTC

Window Store

Hi All,

I have a question about how I can use window stores to achieve this use
case. Thanks for all the help.

A user record will be created when the user first logins and the records
needs to be cleaned up after 10 mins of inactivity. Thus for each user
there will be a TTL but the TTL value will be updated each time when the
user is active before he becomes inactive for the entire 10 min period. We
are currently using PAPI for all our topologies and I was thinking of
implementing it using a punctuator.

My initial logic was to have a KV store with each user as key and TTL as
the value and run a scheduled task every minute that looks at all the
records which have TTL value lesser than the timestamp. But the problem in
this approach was performance. When there are more than 1M records it takes
more than a few seconds to complete this task.

Next approach is to have a window store and a KV store. Window store will
have each user and corresponding TTL rounded to the nearest minute. Then
find all keys between the current time and current time - 1min. Then
iterate these keys and use the KV store to find if the TTL value is still
the same or if we have received any updates after that. If not then the
user will be evicted.

What would be a better and much more scalable solution for this.

Thanks

Re: Window Store

Posted by Navneeth Krishnan <re...@gmail.com>.
Thanks a lot Guozhang. I will try and let you know.

Really appreciate all the help. This community has been amazing.

Thanks

On Tue, Feb 23, 2021 at 5:48 PM Guozhang Wang <wa...@gmail.com> wrote:

> Sorry I was not very clear before: by "WindowStore" I meant implementing
> your own customized store based on a kvStore where the key is a combo
> <timestamp, key>. Note you put timestamp first then key in your
> serialization format, so that you can range-fetch with just the prefix on
> timestamp then. In fact `WindowStore` that we provide is also following
> this design principle, but it's combo key is in <key, timestamp> so range
> fetch is not as efficient since you'd need to fetch a much larger range and
> then filter a lot of records.
>
>
> Guozhang
>
> On Tue, Feb 23, 2021 at 4:04 PM Navneeth Krishnan <
> reachnavneeth2@gmail.com>
> wrote:
>
> > Thanks Guozhang.
> >
> > I don't see the remove method in window stores. Am I missing something?
> It
> > would be very nice to implement the optimization you had mentioned.
> >
> > Thanks
> >
> > On Tue, Feb 23, 2021 at 11:11 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > I see. In that case I think your design with a KVstore plus a
> > book-keeping
> > > window store would work better. One minor optimization you can try
> > though,
> > > is that instead of trying to check if the TTL has changed or not when
> > > expiring from the window store, you can try to delete from the window
> > store
> > > whenever you are updating the kv-store. More specifically, when you
> > update
> > > the kv-store, do sth. like this:
> > >
> > > value = kvStore.get(k);  // here value also encodes the timestamp, e.g.
> > see
> > > "TimestampedKeyValueStore" interface
> > > if (value != v)
> > >   // v is the new value you want to put
> > >   windowStore.remove(combo-key); // here the combo-key is a <timestamp,
> > > key> where timestamp is extracted from value
> > >
> > > kvStore.put(k, v)
> > > kvStore.put(combo-key);  // it is in <new-timestamp-of-v, key>
> > >
> > > Later when you expire, you do not need to check on kvStore if the
> value's
> > > timestamp has changed or not.
> > >
> > >
> > >
> > >
> > > On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan <
> > > reachnavneeth2@gmail.com>
> > > wrote:
> > >
> > > > Thanks Liam & Guozhang.
> > > >
> > > > First of all, we use PAPI in our entire topology and we would like to
> > > > retain it that way rather than combining with DSL. Secondly, even I
> was
> > > > more leaning towards session store but the problem I found with
> session
> > > > store is I cannot get all the expired sessions without keys where as
> > > > windowstore has the option to get all keys by range. Ideally I would
> > like
> > > > to have a punctuate function which finds all the expired records and
> > send
> > > > it to downstream. I looked at KStreamSessionWindowAggregate but it
> > looks
> > > > like we need a new value coming in for the key to even send updates.
> In
> > > my
> > > > case there might not be any activity at all but I still need to send
> > the
> > > > delete event.
> > > >
> > > > Here is how we want it to work
> > > > T -> User1 (Active event)
> > > > T+5 -> User1 (Active event)
> > > > T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
> > > > period)
> > > >
> > > > Thanks
> > > >
> > > > On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Navneeth,
> > > > >
> > > > > I would agree with Liam that a session store seems a good fit for
> > your
> > > > > case. But note that session stores would not expire a session
> > > themselves
> > > > > and it is still the processor node's job to find those already
> > expired
> > > > > sessions and emit results / delete. You can take a look at
> > > > > the KStreamSessionWindowAggregate inside Kafka code base (
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> > > > > )
> > > > > for a reference.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> > > > > liam.clarke@adscale.co.nz> wrote:
> > > > >
> > > > > > Hmmm, thanks Navneeth,
> > > > > >
> > > > > > I feel like a session store set to an inactivity period of 10
> > > minutes,
> > > > > > suppressed until session window closed, combined with a
> > GlobalKTable
> > > > > would
> > > > > > be how I'd start to approach this in the DSL, with the below
> > > topology.
> > > > I
> > > > > > have no idea if my ASCII art below will survive email formatting,
> > so
> > > > I'll
> > > > > > try to explain. User ids stream into the GlobalKTable, and also
> > into
> > > > the
> > > > > > session store. After 10 minutes of inactivity for a given user id
> > > key,
> > > > > the
> > > > > > session expires, and the session store emits the user_id ->
> > > some_value.
> > > > > I'd
> > > > > > then map the some_value to null, to take advantage of KTable
> > > semantics
> > > > > > where `k -> null` is treated as a delete for key k, so an
> inactive
> > > user
> > > > > > would be deleted from the ktable. You could then periodically
> query
> > > the
> > > > > > ktable's key-value store for outside emission.
> > > > > >
> > > > > > That said, this is only how I'd start to explore the problem, and
> > > there
> > > > > are
> > > > > > obvious questions that need to be answered first like how much
> > state
> > > > > would
> > > > > > you end up storing in the session store, etc. I'm hoping someone
> > like
> > > > > John
> > > > > > Roesler who has far better insights into Kafka Streams might
> weigh
> > in
> > > > > here.
> > > > > >
> > > > > >
> > > > > > user ids ------------------------------------------------------>
> > > > > > globalktable <---- keyValueStore periodically queried.
> > > > > >       \------------> session store ----> map (user_id -> null)
> --/
> > > > > >
> > > > > > Good luck,
> > > > > >
> > > > > > Liam
> > > > > >
> > > > > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> > > > > > reachnavneeth2@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Liam,
> > > > > > >
> > > > > > > The use case is stream all data and send it to storage after
> > > > > processing.
> > > > > > > Also when the user is inactive for a 10 min period then send a
> > > > special
> > > > > > > event that marks the user as inactive. I'm trying to implement
> > the
> > > > > > special
> > > > > > > event here.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > > > > > > liam.clarke@adscale.co.nz> wrote:
> > > > > > >
> > > > > > > > Hey Navneeth,
> > > > > > > >
> > > > > > > > So to understand your problem better - do you only want to
> > stream
> > > > > users
> > > > > > > > active within 10 minutes to storage?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Liam
> > > > > > > >
> > > > > > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > > > > > > reachnavneeth2@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > It’s just for emitting to data storage. There is no join
> > here.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > >
> > > > > > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > > > > > > liam.clarke@adscale.co.nz> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Navneeth,
> > > > > > > > > >
> > > > > > > > > > What is the purpose of holding these user records? Is it
> to
> > > > join
> > > > > > > > against
> > > > > > > > > > other streams, or emit to data storage?
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Liam Clarke-Hutchinson
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > > > > > > reachnavneeth2@gmail.com
> > > > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi All,
> > > > > > > > > > >
> > > > > > > > > > > I have a question about how I can use window stores to
> > > > achieve
> > > > > > this
> > > > > > > > use
> > > > > > > > > > > case. Thanks for all the help.
> > > > > > > > > > >
> > > > > > > > > > > A user record will be created when the user first
> logins
> > > and
> > > > > the
> > > > > > > > > records
> > > > > > > > > > > needs to be cleaned up after 10 mins of inactivity.
> Thus
> > > for
> > > > > each
> > > > > > > > user
> > > > > > > > > > > there will be a TTL but the TTL value will be updated
> > each
> > > > time
> > > > > > > when
> > > > > > > > > the
> > > > > > > > > > > user is active before he becomes inactive for the
> entire
> > 10
> > > > min
> > > > > > > > period.
> > > > > > > > > > We
> > > > > > > > > > > are currently using PAPI for all our topologies and I
> was
> > > > > > thinking
> > > > > > > of
> > > > > > > > > > > implementing it using a punctuator.
> > > > > > > > > > >
> > > > > > > > > > > My initial logic was to have a KV store with each user
> as
> > > key
> > > > > and
> > > > > > > TTL
> > > > > > > > > as
> > > > > > > > > > > the value and run a scheduled task every minute that
> > looks
> > > at
> > > > > all
> > > > > > > the
> > > > > > > > > > > records which have TTL value lesser than the timestamp.
> > But
> > > > the
> > > > > > > > problem
> > > > > > > > > > in
> > > > > > > > > > > this approach was performance. When there are more than
> > 1M
> > > > > > records
> > > > > > > it
> > > > > > > > > > takes
> > > > > > > > > > > more than a few seconds to complete this task.
> > > > > > > > > > >
> > > > > > > > > > > Next approach is to have a window store and a KV store.
> > > > Window
> > > > > > > store
> > > > > > > > > will
> > > > > > > > > > > have each user and corresponding TTL rounded to the
> > nearest
> > > > > > minute.
> > > > > > > > > Then
> > > > > > > > > > > find all keys between the current time and current
> time -
> > > > 1min.
> > > > > > > Then
> > > > > > > > > > > iterate these keys and use the KV store to find if the
> > TTL
> > > > > value
> > > > > > is
> > > > > > > > > still
> > > > > > > > > > > the same or if we have received any updates after that.
> > If
> > > > not
> > > > > > then
> > > > > > > > the
> > > > > > > > > > > user will be evicted.
> > > > > > > > > > >
> > > > > > > > > > > What would be a better and much more scalable solution
> > for
> > > > > this.
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Window Store

Posted by Guozhang Wang <wa...@gmail.com>.
Sorry I was not very clear before: by "WindowStore" I meant implementing
your own customized store based on a kvStore where the key is a combo
<timestamp, key>. Note you put timestamp first then key in your
serialization format, so that you can range-fetch with just the prefix on
timestamp then. In fact `WindowStore` that we provide is also following
this design principle, but it's combo key is in <key, timestamp> so range
fetch is not as efficient since you'd need to fetch a much larger range and
then filter a lot of records.


Guozhang

On Tue, Feb 23, 2021 at 4:04 PM Navneeth Krishnan <re...@gmail.com>
wrote:

> Thanks Guozhang.
>
> I don't see the remove method in window stores. Am I missing something? It
> would be very nice to implement the optimization you had mentioned.
>
> Thanks
>
> On Tue, Feb 23, 2021 at 11:11 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > I see. In that case I think your design with a KVstore plus a
> book-keeping
> > window store would work better. One minor optimization you can try
> though,
> > is that instead of trying to check if the TTL has changed or not when
> > expiring from the window store, you can try to delete from the window
> store
> > whenever you are updating the kv-store. More specifically, when you
> update
> > the kv-store, do sth. like this:
> >
> > value = kvStore.get(k);  // here value also encodes the timestamp, e.g.
> see
> > "TimestampedKeyValueStore" interface
> > if (value != v)
> >   // v is the new value you want to put
> >   windowStore.remove(combo-key); // here the combo-key is a <timestamp,
> > key> where timestamp is extracted from value
> >
> > kvStore.put(k, v)
> > kvStore.put(combo-key);  // it is in <new-timestamp-of-v, key>
> >
> > Later when you expire, you do not need to check on kvStore if the value's
> > timestamp has changed or not.
> >
> >
> >
> >
> > On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan <
> > reachnavneeth2@gmail.com>
> > wrote:
> >
> > > Thanks Liam & Guozhang.
> > >
> > > First of all, we use PAPI in our entire topology and we would like to
> > > retain it that way rather than combining with DSL. Secondly, even I was
> > > more leaning towards session store but the problem I found with session
> > > store is I cannot get all the expired sessions without keys where as
> > > windowstore has the option to get all keys by range. Ideally I would
> like
> > > to have a punctuate function which finds all the expired records and
> send
> > > it to downstream. I looked at KStreamSessionWindowAggregate but it
> looks
> > > like we need a new value coming in for the key to even send updates. In
> > my
> > > case there might not be any activity at all but I still need to send
> the
> > > delete event.
> > >
> > > Here is how we want it to work
> > > T -> User1 (Active event)
> > > T+5 -> User1 (Active event)
> > > T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
> > > period)
> > >
> > > Thanks
> > >
> > > On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > > > Hello Navneeth,
> > > >
> > > > I would agree with Liam that a session store seems a good fit for
> your
> > > > case. But note that session stores would not expire a session
> > themselves
> > > > and it is still the processor node's job to find those already
> expired
> > > > sessions and emit results / delete. You can take a look at
> > > > the KStreamSessionWindowAggregate inside Kafka code base (
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> > > > )
> > > > for a reference.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> > > > liam.clarke@adscale.co.nz> wrote:
> > > >
> > > > > Hmmm, thanks Navneeth,
> > > > >
> > > > > I feel like a session store set to an inactivity period of 10
> > minutes,
> > > > > suppressed until session window closed, combined with a
> GlobalKTable
> > > > would
> > > > > be how I'd start to approach this in the DSL, with the below
> > topology.
> > > I
> > > > > have no idea if my ASCII art below will survive email formatting,
> so
> > > I'll
> > > > > try to explain. User ids stream into the GlobalKTable, and also
> into
> > > the
> > > > > session store. After 10 minutes of inactivity for a given user id
> > key,
> > > > the
> > > > > session expires, and the session store emits the user_id ->
> > some_value.
> > > > I'd
> > > > > then map the some_value to null, to take advantage of KTable
> > semantics
> > > > > where `k -> null` is treated as a delete for key k, so an inactive
> > user
> > > > > would be deleted from the ktable. You could then periodically query
> > the
> > > > > ktable's key-value store for outside emission.
> > > > >
> > > > > That said, this is only how I'd start to explore the problem, and
> > there
> > > > are
> > > > > obvious questions that need to be answered first like how much
> state
> > > > would
> > > > > you end up storing in the session store, etc. I'm hoping someone
> like
> > > > John
> > > > > Roesler who has far better insights into Kafka Streams might weigh
> in
> > > > here.
> > > > >
> > > > >
> > > > > user ids ------------------------------------------------------>
> > > > > globalktable <---- keyValueStore periodically queried.
> > > > >       \------------> session store ----> map (user_id -> null) --/
> > > > >
> > > > > Good luck,
> > > > >
> > > > > Liam
> > > > >
> > > > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> > > > > reachnavneeth2@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Liam,
> > > > > >
> > > > > > The use case is stream all data and send it to storage after
> > > > processing.
> > > > > > Also when the user is inactive for a 10 min period then send a
> > > special
> > > > > > event that marks the user as inactive. I'm trying to implement
> the
> > > > > special
> > > > > > event here.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > >
> > > > > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > > > > > liam.clarke@adscale.co.nz> wrote:
> > > > > >
> > > > > > > Hey Navneeth,
> > > > > > >
> > > > > > > So to understand your problem better - do you only want to
> stream
> > > > users
> > > > > > > active within 10 minutes to storage?
> > > > > > >
> > > > > > > Cheers,
> > > > > > >
> > > > > > > Liam
> > > > > > >
> > > > > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > > > > > reachnavneeth2@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > It’s just for emitting to data storage. There is no join
> here.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > >
> > > > > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > > > > > liam.clarke@adscale.co.nz> wrote:
> > > > > > > >
> > > > > > > > > Hi Navneeth,
> > > > > > > > >
> > > > > > > > > What is the purpose of holding these user records? Is it to
> > > join
> > > > > > > against
> > > > > > > > > other streams, or emit to data storage?
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > >
> > > > > > > > > Liam Clarke-Hutchinson
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > > > > > reachnavneeth2@gmail.com
> > > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi All,
> > > > > > > > > >
> > > > > > > > > > I have a question about how I can use window stores to
> > > achieve
> > > > > this
> > > > > > > use
> > > > > > > > > > case. Thanks for all the help.
> > > > > > > > > >
> > > > > > > > > > A user record will be created when the user first logins
> > and
> > > > the
> > > > > > > > records
> > > > > > > > > > needs to be cleaned up after 10 mins of inactivity. Thus
> > for
> > > > each
> > > > > > > user
> > > > > > > > > > there will be a TTL but the TTL value will be updated
> each
> > > time
> > > > > > when
> > > > > > > > the
> > > > > > > > > > user is active before he becomes inactive for the entire
> 10
> > > min
> > > > > > > period.
> > > > > > > > > We
> > > > > > > > > > are currently using PAPI for all our topologies and I was
> > > > > thinking
> > > > > > of
> > > > > > > > > > implementing it using a punctuator.
> > > > > > > > > >
> > > > > > > > > > My initial logic was to have a KV store with each user as
> > key
> > > > and
> > > > > > TTL
> > > > > > > > as
> > > > > > > > > > the value and run a scheduled task every minute that
> looks
> > at
> > > > all
> > > > > > the
> > > > > > > > > > records which have TTL value lesser than the timestamp.
> But
> > > the
> > > > > > > problem
> > > > > > > > > in
> > > > > > > > > > this approach was performance. When there are more than
> 1M
> > > > > records
> > > > > > it
> > > > > > > > > takes
> > > > > > > > > > more than a few seconds to complete this task.
> > > > > > > > > >
> > > > > > > > > > Next approach is to have a window store and a KV store.
> > > Window
> > > > > > store
> > > > > > > > will
> > > > > > > > > > have each user and corresponding TTL rounded to the
> nearest
> > > > > minute.
> > > > > > > > Then
> > > > > > > > > > find all keys between the current time and current time -
> > > 1min.
> > > > > > Then
> > > > > > > > > > iterate these keys and use the KV store to find if the
> TTL
> > > > value
> > > > > is
> > > > > > > > still
> > > > > > > > > > the same or if we have received any updates after that.
> If
> > > not
> > > > > then
> > > > > > > the
> > > > > > > > > > user will be evicted.
> > > > > > > > > >
> > > > > > > > > > What would be a better and much more scalable solution
> for
> > > > this.
> > > > > > > > > >
> > > > > > > > > > Thanks
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Window Store

Posted by Navneeth Krishnan <re...@gmail.com>.
Thanks Guozhang.

I don't see the remove method in window stores. Am I missing something? It
would be very nice to implement the optimization you had mentioned.

Thanks

On Tue, Feb 23, 2021 at 11:11 AM Guozhang Wang <wa...@gmail.com> wrote:

> I see. In that case I think your design with a KVstore plus a book-keeping
> window store would work better. One minor optimization you can try though,
> is that instead of trying to check if the TTL has changed or not when
> expiring from the window store, you can try to delete from the window store
> whenever you are updating the kv-store. More specifically, when you update
> the kv-store, do sth. like this:
>
> value = kvStore.get(k);  // here value also encodes the timestamp, e.g. see
> "TimestampedKeyValueStore" interface
> if (value != v)
>   // v is the new value you want to put
>   windowStore.remove(combo-key); // here the combo-key is a <timestamp,
> key> where timestamp is extracted from value
>
> kvStore.put(k, v)
> kvStore.put(combo-key);  // it is in <new-timestamp-of-v, key>
>
> Later when you expire, you do not need to check on kvStore if the value's
> timestamp has changed or not.
>
>
>
>
> On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan <
> reachnavneeth2@gmail.com>
> wrote:
>
> > Thanks Liam & Guozhang.
> >
> > First of all, we use PAPI in our entire topology and we would like to
> > retain it that way rather than combining with DSL. Secondly, even I was
> > more leaning towards session store but the problem I found with session
> > store is I cannot get all the expired sessions without keys where as
> > windowstore has the option to get all keys by range. Ideally I would like
> > to have a punctuate function which finds all the expired records and send
> > it to downstream. I looked at KStreamSessionWindowAggregate but it looks
> > like we need a new value coming in for the key to even send updates. In
> my
> > case there might not be any activity at all but I still need to send the
> > delete event.
> >
> > Here is how we want it to work
> > T -> User1 (Active event)
> > T+5 -> User1 (Active event)
> > T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
> > period)
> >
> > Thanks
> >
> > On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Hello Navneeth,
> > >
> > > I would agree with Liam that a session store seems a good fit for your
> > > case. But note that session stores would not expire a session
> themselves
> > > and it is still the processor node's job to find those already expired
> > > sessions and emit results / delete. You can take a look at
> > > the KStreamSessionWindowAggregate inside Kafka code base (
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> > > )
> > > for a reference.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> > > liam.clarke@adscale.co.nz> wrote:
> > >
> > > > Hmmm, thanks Navneeth,
> > > >
> > > > I feel like a session store set to an inactivity period of 10
> minutes,
> > > > suppressed until session window closed, combined with a GlobalKTable
> > > would
> > > > be how I'd start to approach this in the DSL, with the below
> topology.
> > I
> > > > have no idea if my ASCII art below will survive email formatting, so
> > I'll
> > > > try to explain. User ids stream into the GlobalKTable, and also into
> > the
> > > > session store. After 10 minutes of inactivity for a given user id
> key,
> > > the
> > > > session expires, and the session store emits the user_id ->
> some_value.
> > > I'd
> > > > then map the some_value to null, to take advantage of KTable
> semantics
> > > > where `k -> null` is treated as a delete for key k, so an inactive
> user
> > > > would be deleted from the ktable. You could then periodically query
> the
> > > > ktable's key-value store for outside emission.
> > > >
> > > > That said, this is only how I'd start to explore the problem, and
> there
> > > are
> > > > obvious questions that need to be answered first like how much state
> > > would
> > > > you end up storing in the session store, etc. I'm hoping someone like
> > > John
> > > > Roesler who has far better insights into Kafka Streams might weigh in
> > > here.
> > > >
> > > >
> > > > user ids ------------------------------------------------------>
> > > > globalktable <---- keyValueStore periodically queried.
> > > >       \------------> session store ----> map (user_id -> null) --/
> > > >
> > > > Good luck,
> > > >
> > > > Liam
> > > >
> > > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> > > > reachnavneeth2@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Liam,
> > > > >
> > > > > The use case is stream all data and send it to storage after
> > > processing.
> > > > > Also when the user is inactive for a 10 min period then send a
> > special
> > > > > event that marks the user as inactive. I'm trying to implement the
> > > > special
> > > > > event here.
> > > > >
> > > > > Thanks
> > > > >
> > > > >
> > > > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > > > > liam.clarke@adscale.co.nz> wrote:
> > > > >
> > > > > > Hey Navneeth,
> > > > > >
> > > > > > So to understand your problem better - do you only want to stream
> > > users
> > > > > > active within 10 minutes to storage?
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Liam
> > > > > >
> > > > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > > > > reachnavneeth2@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > It’s just for emitting to data storage. There is no join here.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > > > > liam.clarke@adscale.co.nz> wrote:
> > > > > > >
> > > > > > > > Hi Navneeth,
> > > > > > > >
> > > > > > > > What is the purpose of holding these user records? Is it to
> > join
> > > > > > against
> > > > > > > > other streams, or emit to data storage?
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Liam Clarke-Hutchinson
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > > > > reachnavneeth2@gmail.com
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I have a question about how I can use window stores to
> > achieve
> > > > this
> > > > > > use
> > > > > > > > > case. Thanks for all the help.
> > > > > > > > >
> > > > > > > > > A user record will be created when the user first logins
> and
> > > the
> > > > > > > records
> > > > > > > > > needs to be cleaned up after 10 mins of inactivity. Thus
> for
> > > each
> > > > > > user
> > > > > > > > > there will be a TTL but the TTL value will be updated each
> > time
> > > > > when
> > > > > > > the
> > > > > > > > > user is active before he becomes inactive for the entire 10
> > min
> > > > > > period.
> > > > > > > > We
> > > > > > > > > are currently using PAPI for all our topologies and I was
> > > > thinking
> > > > > of
> > > > > > > > > implementing it using a punctuator.
> > > > > > > > >
> > > > > > > > > My initial logic was to have a KV store with each user as
> key
> > > and
> > > > > TTL
> > > > > > > as
> > > > > > > > > the value and run a scheduled task every minute that looks
> at
> > > all
> > > > > the
> > > > > > > > > records which have TTL value lesser than the timestamp. But
> > the
> > > > > > problem
> > > > > > > > in
> > > > > > > > > this approach was performance. When there are more than 1M
> > > > records
> > > > > it
> > > > > > > > takes
> > > > > > > > > more than a few seconds to complete this task.
> > > > > > > > >
> > > > > > > > > Next approach is to have a window store and a KV store.
> > Window
> > > > > store
> > > > > > > will
> > > > > > > > > have each user and corresponding TTL rounded to the nearest
> > > > minute.
> > > > > > > Then
> > > > > > > > > find all keys between the current time and current time -
> > 1min.
> > > > > Then
> > > > > > > > > iterate these keys and use the KV store to find if the TTL
> > > value
> > > > is
> > > > > > > still
> > > > > > > > > the same or if we have received any updates after that. If
> > not
> > > > then
> > > > > > the
> > > > > > > > > user will be evicted.
> > > > > > > > >
> > > > > > > > > What would be a better and much more scalable solution for
> > > this.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Window Store

Posted by Guozhang Wang <wa...@gmail.com>.
I see. In that case I think your design with a KVstore plus a book-keeping
window store would work better. One minor optimization you can try though,
is that instead of trying to check if the TTL has changed or not when
expiring from the window store, you can try to delete from the window store
whenever you are updating the kv-store. More specifically, when you update
the kv-store, do sth. like this:

value = kvStore.get(k);  // here value also encodes the timestamp, e.g. see
"TimestampedKeyValueStore" interface
if (value != v)
  // v is the new value you want to put
  windowStore.remove(combo-key); // here the combo-key is a <timestamp,
key> where timestamp is extracted from value

kvStore.put(k, v)
kvStore.put(combo-key);  // it is in <new-timestamp-of-v, key>

Later when you expire, you do not need to check on kvStore if the value's
timestamp has changed or not.




On Sun, Feb 21, 2021 at 9:17 AM Navneeth Krishnan <re...@gmail.com>
wrote:

> Thanks Liam & Guozhang.
>
> First of all, we use PAPI in our entire topology and we would like to
> retain it that way rather than combining with DSL. Secondly, even I was
> more leaning towards session store but the problem I found with session
> store is I cannot get all the expired sessions without keys where as
> windowstore has the option to get all keys by range. Ideally I would like
> to have a punctuate function which finds all the expired records and send
> it to downstream. I looked at KStreamSessionWindowAggregate but it looks
> like we need a new value coming in for the key to even send updates. In my
> case there might not be any activity at all but I still need to send the
> delete event.
>
> Here is how we want it to work
> T -> User1 (Active event)
> T+5 -> User1 (Active event)
> T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
> period)
>
> Thanks
>
> On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Navneeth,
> >
> > I would agree with Liam that a session store seems a good fit for your
> > case. But note that session stores would not expire a session themselves
> > and it is still the processor node's job to find those already expired
> > sessions and emit results / delete. You can take a look at
> > the KStreamSessionWindowAggregate inside Kafka code base (
> >
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> > )
> > for a reference.
> >
> >
> > Guozhang
> >
> > On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> > liam.clarke@adscale.co.nz> wrote:
> >
> > > Hmmm, thanks Navneeth,
> > >
> > > I feel like a session store set to an inactivity period of 10 minutes,
> > > suppressed until session window closed, combined with a GlobalKTable
> > would
> > > be how I'd start to approach this in the DSL, with the below topology.
> I
> > > have no idea if my ASCII art below will survive email formatting, so
> I'll
> > > try to explain. User ids stream into the GlobalKTable, and also into
> the
> > > session store. After 10 minutes of inactivity for a given user id key,
> > the
> > > session expires, and the session store emits the user_id -> some_value.
> > I'd
> > > then map the some_value to null, to take advantage of KTable semantics
> > > where `k -> null` is treated as a delete for key k, so an inactive user
> > > would be deleted from the ktable. You could then periodically query the
> > > ktable's key-value store for outside emission.
> > >
> > > That said, this is only how I'd start to explore the problem, and there
> > are
> > > obvious questions that need to be answered first like how much state
> > would
> > > you end up storing in the session store, etc. I'm hoping someone like
> > John
> > > Roesler who has far better insights into Kafka Streams might weigh in
> > here.
> > >
> > >
> > > user ids ------------------------------------------------------>
> > > globalktable <---- keyValueStore periodically queried.
> > >       \------------> session store ----> map (user_id -> null) --/
> > >
> > > Good luck,
> > >
> > > Liam
> > >
> > > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> > > reachnavneeth2@gmail.com>
> > > wrote:
> > >
> > > > Hi Liam,
> > > >
> > > > The use case is stream all data and send it to storage after
> > processing.
> > > > Also when the user is inactive for a 10 min period then send a
> special
> > > > event that marks the user as inactive. I'm trying to implement the
> > > special
> > > > event here.
> > > >
> > > > Thanks
> > > >
> > > >
> > > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > > > liam.clarke@adscale.co.nz> wrote:
> > > >
> > > > > Hey Navneeth,
> > > > >
> > > > > So to understand your problem better - do you only want to stream
> > users
> > > > > active within 10 minutes to storage?
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Liam
> > > > >
> > > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > > > reachnavneeth2@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > It’s just for emitting to data storage. There is no join here.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > > > liam.clarke@adscale.co.nz> wrote:
> > > > > >
> > > > > > > Hi Navneeth,
> > > > > > >
> > > > > > > What is the purpose of holding these user records? Is it to
> join
> > > > > against
> > > > > > > other streams, or emit to data storage?
> > > > > > >
> > > > > > > Cheers,
> > > > > > >
> > > > > > > Liam Clarke-Hutchinson
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > > > reachnavneeth2@gmail.com
> > > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > I have a question about how I can use window stores to
> achieve
> > > this
> > > > > use
> > > > > > > > case. Thanks for all the help.
> > > > > > > >
> > > > > > > > A user record will be created when the user first logins and
> > the
> > > > > > records
> > > > > > > > needs to be cleaned up after 10 mins of inactivity. Thus for
> > each
> > > > > user
> > > > > > > > there will be a TTL but the TTL value will be updated each
> time
> > > > when
> > > > > > the
> > > > > > > > user is active before he becomes inactive for the entire 10
> min
> > > > > period.
> > > > > > > We
> > > > > > > > are currently using PAPI for all our topologies and I was
> > > thinking
> > > > of
> > > > > > > > implementing it using a punctuator.
> > > > > > > >
> > > > > > > > My initial logic was to have a KV store with each user as key
> > and
> > > > TTL
> > > > > > as
> > > > > > > > the value and run a scheduled task every minute that looks at
> > all
> > > > the
> > > > > > > > records which have TTL value lesser than the timestamp. But
> the
> > > > > problem
> > > > > > > in
> > > > > > > > this approach was performance. When there are more than 1M
> > > records
> > > > it
> > > > > > > takes
> > > > > > > > more than a few seconds to complete this task.
> > > > > > > >
> > > > > > > > Next approach is to have a window store and a KV store.
> Window
> > > > store
> > > > > > will
> > > > > > > > have each user and corresponding TTL rounded to the nearest
> > > minute.
> > > > > > Then
> > > > > > > > find all keys between the current time and current time -
> 1min.
> > > > Then
> > > > > > > > iterate these keys and use the KV store to find if the TTL
> > value
> > > is
> > > > > > still
> > > > > > > > the same or if we have received any updates after that. If
> not
> > > then
> > > > > the
> > > > > > > > user will be evicted.
> > > > > > > >
> > > > > > > > What would be a better and much more scalable solution for
> > this.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Re: Window Store

Posted by Navneeth Krishnan <re...@gmail.com>.
Thanks Liam & Guozhang.

First of all, we use PAPI in our entire topology and we would like to
retain it that way rather than combining with DSL. Secondly, even I was
more leaning towards session store but the problem I found with session
store is I cannot get all the expired sessions without keys where as
windowstore has the option to get all keys by range. Ideally I would like
to have a punctuate function which finds all the expired records and send
it to downstream. I looked at KStreamSessionWindowAggregate but it looks
like we need a new value coming in for the key to even send updates. In my
case there might not be any activity at all but I still need to send the
delete event.

Here is how we want it to work
T -> User1 (Active event)
T+5 -> User1 (Active event)
T+15 -> User1 (Delete event - Since the user is inactive for a 10 min
period)

Thanks

On Fri, Feb 19, 2021 at 12:19 PM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Navneeth,
>
> I would agree with Liam that a session store seems a good fit for your
> case. But note that session stores would not expire a session themselves
> and it is still the processor node's job to find those already expired
> sessions and emit results / delete. You can take a look at
> the KStreamSessionWindowAggregate inside Kafka code base (
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
> )
> for a reference.
>
>
> Guozhang
>
> On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
>
> > Hmmm, thanks Navneeth,
> >
> > I feel like a session store set to an inactivity period of 10 minutes,
> > suppressed until session window closed, combined with a GlobalKTable
> would
> > be how I'd start to approach this in the DSL, with the below topology. I
> > have no idea if my ASCII art below will survive email formatting, so I'll
> > try to explain. User ids stream into the GlobalKTable, and also into the
> > session store. After 10 minutes of inactivity for a given user id key,
> the
> > session expires, and the session store emits the user_id -> some_value.
> I'd
> > then map the some_value to null, to take advantage of KTable semantics
> > where `k -> null` is treated as a delete for key k, so an inactive user
> > would be deleted from the ktable. You could then periodically query the
> > ktable's key-value store for outside emission.
> >
> > That said, this is only how I'd start to explore the problem, and there
> are
> > obvious questions that need to be answered first like how much state
> would
> > you end up storing in the session store, etc. I'm hoping someone like
> John
> > Roesler who has far better insights into Kafka Streams might weigh in
> here.
> >
> >
> > user ids ------------------------------------------------------>
> > globalktable <---- keyValueStore periodically queried.
> >       \------------> session store ----> map (user_id -> null) --/
> >
> > Good luck,
> >
> > Liam
> >
> > On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> > reachnavneeth2@gmail.com>
> > wrote:
> >
> > > Hi Liam,
> > >
> > > The use case is stream all data and send it to storage after
> processing.
> > > Also when the user is inactive for a 10 min period then send a special
> > > event that marks the user as inactive. I'm trying to implement the
> > special
> > > event here.
> > >
> > > Thanks
> > >
> > >
> > > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > > liam.clarke@adscale.co.nz> wrote:
> > >
> > > > Hey Navneeth,
> > > >
> > > > So to understand your problem better - do you only want to stream
> users
> > > > active within 10 minutes to storage?
> > > >
> > > > Cheers,
> > > >
> > > > Liam
> > > >
> > > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > > reachnavneeth2@gmail.com>
> > > > wrote:
> > > >
> > > > > It’s just for emitting to data storage. There is no join here.
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > > liam.clarke@adscale.co.nz> wrote:
> > > > >
> > > > > > Hi Navneeth,
> > > > > >
> > > > > > What is the purpose of holding these user records? Is it to join
> > > > against
> > > > > > other streams, or emit to data storage?
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Liam Clarke-Hutchinson
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > > reachnavneeth2@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > I have a question about how I can use window stores to achieve
> > this
> > > > use
> > > > > > > case. Thanks for all the help.
> > > > > > >
> > > > > > > A user record will be created when the user first logins and
> the
> > > > > records
> > > > > > > needs to be cleaned up after 10 mins of inactivity. Thus for
> each
> > > > user
> > > > > > > there will be a TTL but the TTL value will be updated each time
> > > when
> > > > > the
> > > > > > > user is active before he becomes inactive for the entire 10 min
> > > > period.
> > > > > > We
> > > > > > > are currently using PAPI for all our topologies and I was
> > thinking
> > > of
> > > > > > > implementing it using a punctuator.
> > > > > > >
> > > > > > > My initial logic was to have a KV store with each user as key
> and
> > > TTL
> > > > > as
> > > > > > > the value and run a scheduled task every minute that looks at
> all
> > > the
> > > > > > > records which have TTL value lesser than the timestamp. But the
> > > > problem
> > > > > > in
> > > > > > > this approach was performance. When there are more than 1M
> > records
> > > it
> > > > > > takes
> > > > > > > more than a few seconds to complete this task.
> > > > > > >
> > > > > > > Next approach is to have a window store and a KV store. Window
> > > store
> > > > > will
> > > > > > > have each user and corresponding TTL rounded to the nearest
> > minute.
> > > > > Then
> > > > > > > find all keys between the current time and current time - 1min.
> > > Then
> > > > > > > iterate these keys and use the KV store to find if the TTL
> value
> > is
> > > > > still
> > > > > > > the same or if we have received any updates after that. If not
> > then
> > > > the
> > > > > > > user will be evicted.
> > > > > > >
> > > > > > > What would be a better and much more scalable solution for
> this.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Re: Window Store

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Navneeth,

I would agree with Liam that a session store seems a good fit for your
case. But note that session stores would not expire a session themselves
and it is still the processor node's job to find those already expired
sessions and emit results / delete. You can take a look at
the KStreamSessionWindowAggregate inside Kafka code base (
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java)
for a reference.


Guozhang

On Thu, Feb 18, 2021 at 1:21 PM Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:

> Hmmm, thanks Navneeth,
>
> I feel like a session store set to an inactivity period of 10 minutes,
> suppressed until session window closed, combined with a GlobalKTable would
> be how I'd start to approach this in the DSL, with the below topology. I
> have no idea if my ASCII art below will survive email formatting, so I'll
> try to explain. User ids stream into the GlobalKTable, and also into the
> session store. After 10 minutes of inactivity for a given user id key, the
> session expires, and the session store emits the user_id -> some_value. I'd
> then map the some_value to null, to take advantage of KTable semantics
> where `k -> null` is treated as a delete for key k, so an inactive user
> would be deleted from the ktable. You could then periodically query the
> ktable's key-value store for outside emission.
>
> That said, this is only how I'd start to explore the problem, and there are
> obvious questions that need to be answered first like how much state would
> you end up storing in the session store, etc. I'm hoping someone like John
> Roesler who has far better insights into Kafka Streams might weigh in here.
>
>
> user ids ------------------------------------------------------>
> globalktable <---- keyValueStore periodically queried.
>       \------------> session store ----> map (user_id -> null) --/
>
> Good luck,
>
> Liam
>
> On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <
> reachnavneeth2@gmail.com>
> wrote:
>
> > Hi Liam,
> >
> > The use case is stream all data and send it to storage after processing.
> > Also when the user is inactive for a 10 min period then send a special
> > event that marks the user as inactive. I'm trying to implement the
> special
> > event here.
> >
> > Thanks
> >
> >
> > On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> > liam.clarke@adscale.co.nz> wrote:
> >
> > > Hey Navneeth,
> > >
> > > So to understand your problem better - do you only want to stream users
> > > active within 10 minutes to storage?
> > >
> > > Cheers,
> > >
> > > Liam
> > >
> > > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > > reachnavneeth2@gmail.com>
> > > wrote:
> > >
> > > > It’s just for emitting to data storage. There is no join here.
> > > >
> > > > Thanks
> > > >
> > > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > > liam.clarke@adscale.co.nz> wrote:
> > > >
> > > > > Hi Navneeth,
> > > > >
> > > > > What is the purpose of holding these user records? Is it to join
> > > against
> > > > > other streams, or emit to data storage?
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Liam Clarke-Hutchinson
> > > > >
> > > > >
> > > > >
> > > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > > reachnavneeth2@gmail.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I have a question about how I can use window stores to achieve
> this
> > > use
> > > > > > case. Thanks for all the help.
> > > > > >
> > > > > > A user record will be created when the user first logins and the
> > > > records
> > > > > > needs to be cleaned up after 10 mins of inactivity. Thus for each
> > > user
> > > > > > there will be a TTL but the TTL value will be updated each time
> > when
> > > > the
> > > > > > user is active before he becomes inactive for the entire 10 min
> > > period.
> > > > > We
> > > > > > are currently using PAPI for all our topologies and I was
> thinking
> > of
> > > > > > implementing it using a punctuator.
> > > > > >
> > > > > > My initial logic was to have a KV store with each user as key and
> > TTL
> > > > as
> > > > > > the value and run a scheduled task every minute that looks at all
> > the
> > > > > > records which have TTL value lesser than the timestamp. But the
> > > problem
> > > > > in
> > > > > > this approach was performance. When there are more than 1M
> records
> > it
> > > > > takes
> > > > > > more than a few seconds to complete this task.
> > > > > >
> > > > > > Next approach is to have a window store and a KV store. Window
> > store
> > > > will
> > > > > > have each user and corresponding TTL rounded to the nearest
> minute.
> > > > Then
> > > > > > find all keys between the current time and current time - 1min.
> > Then
> > > > > > iterate these keys and use the KV store to find if the TTL value
> is
> > > > still
> > > > > > the same or if we have received any updates after that. If not
> then
> > > the
> > > > > > user will be evicted.
> > > > > >
> > > > > > What would be a better and much more scalable solution for this.
> > > > > >
> > > > > > Thanks
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 
-- Guozhang

Re: Window Store

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hmmm, thanks Navneeth,

I feel like a session store set to an inactivity period of 10 minutes,
suppressed until session window closed, combined with a GlobalKTable would
be how I'd start to approach this in the DSL, with the below topology. I
have no idea if my ASCII art below will survive email formatting, so I'll
try to explain. User ids stream into the GlobalKTable, and also into the
session store. After 10 minutes of inactivity for a given user id key, the
session expires, and the session store emits the user_id -> some_value. I'd
then map the some_value to null, to take advantage of KTable semantics
where `k -> null` is treated as a delete for key k, so an inactive user
would be deleted from the ktable. You could then periodically query the
ktable's key-value store for outside emission.

That said, this is only how I'd start to explore the problem, and there are
obvious questions that need to be answered first like how much state would
you end up storing in the session store, etc. I'm hoping someone like John
Roesler who has far better insights into Kafka Streams might weigh in here.


user ids ------------------------------------------------------>
globalktable <---- keyValueStore periodically queried.
      \------------> session store ----> map (user_id -> null) --/

Good luck,

Liam

On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan <re...@gmail.com>
wrote:

> Hi Liam,
>
> The use case is stream all data and send it to storage after processing.
> Also when the user is inactive for a 10 min period then send a special
> event that marks the user as inactive. I'm trying to implement the special
> event here.
>
> Thanks
>
>
> On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
>
> > Hey Navneeth,
> >
> > So to understand your problem better - do you only want to stream users
> > active within 10 minutes to storage?
> >
> > Cheers,
> >
> > Liam
> >
> > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > reachnavneeth2@gmail.com>
> > wrote:
> >
> > > It’s just for emitting to data storage. There is no join here.
> > >
> > > Thanks
> > >
> > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > liam.clarke@adscale.co.nz> wrote:
> > >
> > > > Hi Navneeth,
> > > >
> > > > What is the purpose of holding these user records? Is it to join
> > against
> > > > other streams, or emit to data storage?
> > > >
> > > > Cheers,
> > > >
> > > > Liam Clarke-Hutchinson
> > > >
> > > >
> > > >
> > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > reachnavneeth2@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I have a question about how I can use window stores to achieve this
> > use
> > > > > case. Thanks for all the help.
> > > > >
> > > > > A user record will be created when the user first logins and the
> > > records
> > > > > needs to be cleaned up after 10 mins of inactivity. Thus for each
> > user
> > > > > there will be a TTL but the TTL value will be updated each time
> when
> > > the
> > > > > user is active before he becomes inactive for the entire 10 min
> > period.
> > > > We
> > > > > are currently using PAPI for all our topologies and I was thinking
> of
> > > > > implementing it using a punctuator.
> > > > >
> > > > > My initial logic was to have a KV store with each user as key and
> TTL
> > > as
> > > > > the value and run a scheduled task every minute that looks at all
> the
> > > > > records which have TTL value lesser than the timestamp. But the
> > problem
> > > > in
> > > > > this approach was performance. When there are more than 1M records
> it
> > > > takes
> > > > > more than a few seconds to complete this task.
> > > > >
> > > > > Next approach is to have a window store and a KV store. Window
> store
> > > will
> > > > > have each user and corresponding TTL rounded to the nearest minute.
> > > Then
> > > > > find all keys between the current time and current time - 1min.
> Then
> > > > > iterate these keys and use the KV store to find if the TTL value is
> > > still
> > > > > the same or if we have received any updates after that. If not then
> > the
> > > > > user will be evicted.
> > > > >
> > > > > What would be a better and much more scalable solution for this.
> > > > >
> > > > > Thanks
> > > > >
> > > >
> > >
> >
>

Re: Window Store

Posted by Navneeth Krishnan <re...@gmail.com>.
Hi Liam,

The use case is stream all data and send it to storage after processing.
Also when the user is inactive for a 10 min period then send a special
event that marks the user as inactive. I'm trying to implement the special
event here.

Thanks


On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:

> Hey Navneeth,
>
> So to understand your problem better - do you only want to stream users
> active within 10 minutes to storage?
>
> Cheers,
>
> Liam
>
> On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> reachnavneeth2@gmail.com>
> wrote:
>
> > It’s just for emitting to data storage. There is no join here.
> >
> > Thanks
> >
> > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > liam.clarke@adscale.co.nz> wrote:
> >
> > > Hi Navneeth,
> > >
> > > What is the purpose of holding these user records? Is it to join
> against
> > > other streams, or emit to data storage?
> > >
> > > Cheers,
> > >
> > > Liam Clarke-Hutchinson
> > >
> > >
> > >
> > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > reachnavneeth2@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I have a question about how I can use window stores to achieve this
> use
> > > > case. Thanks for all the help.
> > > >
> > > > A user record will be created when the user first logins and the
> > records
> > > > needs to be cleaned up after 10 mins of inactivity. Thus for each
> user
> > > > there will be a TTL but the TTL value will be updated each time when
> > the
> > > > user is active before he becomes inactive for the entire 10 min
> period.
> > > We
> > > > are currently using PAPI for all our topologies and I was thinking of
> > > > implementing it using a punctuator.
> > > >
> > > > My initial logic was to have a KV store with each user as key and TTL
> > as
> > > > the value and run a scheduled task every minute that looks at all the
> > > > records which have TTL value lesser than the timestamp. But the
> problem
> > > in
> > > > this approach was performance. When there are more than 1M records it
> > > takes
> > > > more than a few seconds to complete this task.
> > > >
> > > > Next approach is to have a window store and a KV store. Window store
> > will
> > > > have each user and corresponding TTL rounded to the nearest minute.
> > Then
> > > > find all keys between the current time and current time - 1min. Then
> > > > iterate these keys and use the KV store to find if the TTL value is
> > still
> > > > the same or if we have received any updates after that. If not then
> the
> > > > user will be evicted.
> > > >
> > > > What would be a better and much more scalable solution for this.
> > > >
> > > > Thanks
> > > >
> > >
> >
>

Re: Window Store

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hey Navneeth,

So to understand your problem better - do you only want to stream users
active within 10 minutes to storage?

Cheers,

Liam

On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <re...@gmail.com>
wrote:

> It’s just for emitting to data storage. There is no join here.
>
> Thanks
>
> On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> liam.clarke@adscale.co.nz> wrote:
>
> > Hi Navneeth,
> >
> > What is the purpose of holding these user records? Is it to join against
> > other streams, or emit to data storage?
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> >
> >
> > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> reachnavneeth2@gmail.com
> > >
> > wrote:
> >
> > > Hi All,
> > >
> > > I have a question about how I can use window stores to achieve this use
> > > case. Thanks for all the help.
> > >
> > > A user record will be created when the user first logins and the
> records
> > > needs to be cleaned up after 10 mins of inactivity. Thus for each user
> > > there will be a TTL but the TTL value will be updated each time when
> the
> > > user is active before he becomes inactive for the entire 10 min period.
> > We
> > > are currently using PAPI for all our topologies and I was thinking of
> > > implementing it using a punctuator.
> > >
> > > My initial logic was to have a KV store with each user as key and TTL
> as
> > > the value and run a scheduled task every minute that looks at all the
> > > records which have TTL value lesser than the timestamp. But the problem
> > in
> > > this approach was performance. When there are more than 1M records it
> > takes
> > > more than a few seconds to complete this task.
> > >
> > > Next approach is to have a window store and a KV store. Window store
> will
> > > have each user and corresponding TTL rounded to the nearest minute.
> Then
> > > find all keys between the current time and current time - 1min. Then
> > > iterate these keys and use the KV store to find if the TTL value is
> still
> > > the same or if we have received any updates after that. If not then the
> > > user will be evicted.
> > >
> > > What would be a better and much more scalable solution for this.
> > >
> > > Thanks
> > >
> >
>

Re: Window Store

Posted by Navneeth Krishnan <re...@gmail.com>.
It’s just for emitting to data storage. There is no join here.

Thanks

On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
liam.clarke@adscale.co.nz> wrote:

> Hi Navneeth,
>
> What is the purpose of holding these user records? Is it to join against
> other streams, or emit to data storage?
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
>
>
> On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <reachnavneeth2@gmail.com
> >
> wrote:
>
> > Hi All,
> >
> > I have a question about how I can use window stores to achieve this use
> > case. Thanks for all the help.
> >
> > A user record will be created when the user first logins and the records
> > needs to be cleaned up after 10 mins of inactivity. Thus for each user
> > there will be a TTL but the TTL value will be updated each time when the
> > user is active before he becomes inactive for the entire 10 min period.
> We
> > are currently using PAPI for all our topologies and I was thinking of
> > implementing it using a punctuator.
> >
> > My initial logic was to have a KV store with each user as key and TTL as
> > the value and run a scheduled task every minute that looks at all the
> > records which have TTL value lesser than the timestamp. But the problem
> in
> > this approach was performance. When there are more than 1M records it
> takes
> > more than a few seconds to complete this task.
> >
> > Next approach is to have a window store and a KV store. Window store will
> > have each user and corresponding TTL rounded to the nearest minute. Then
> > find all keys between the current time and current time - 1min. Then
> > iterate these keys and use the KV store to find if the TTL value is still
> > the same or if we have received any updates after that. If not then the
> > user will be evicted.
> >
> > What would be a better and much more scalable solution for this.
> >
> > Thanks
> >
>

Re: Window Store

Posted by Liam Clarke-Hutchinson <li...@adscale.co.nz>.
Hi Navneeth,

What is the purpose of holding these user records? Is it to join against
other streams, or emit to data storage?

Cheers,

Liam Clarke-Hutchinson



On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <re...@gmail.com>
wrote:

> Hi All,
>
> I have a question about how I can use window stores to achieve this use
> case. Thanks for all the help.
>
> A user record will be created when the user first logins and the records
> needs to be cleaned up after 10 mins of inactivity. Thus for each user
> there will be a TTL but the TTL value will be updated each time when the
> user is active before he becomes inactive for the entire 10 min period. We
> are currently using PAPI for all our topologies and I was thinking of
> implementing it using a punctuator.
>
> My initial logic was to have a KV store with each user as key and TTL as
> the value and run a scheduled task every minute that looks at all the
> records which have TTL value lesser than the timestamp. But the problem in
> this approach was performance. When there are more than 1M records it takes
> more than a few seconds to complete this task.
>
> Next approach is to have a window store and a KV store. Window store will
> have each user and corresponding TTL rounded to the nearest minute. Then
> find all keys between the current time and current time - 1min. Then
> iterate these keys and use the KV store to find if the TTL value is still
> the same or if we have received any updates after that. If not then the
> user will be evicted.
>
> What would be a better and much more scalable solution for this.
>
> Thanks
>