You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by vb...@hushmail.com on 2018/07/17 08:09:39 UTC

how to aggregate sessions

Hi,

My use case includes consuming events for sessions and once an
inactivity gap is over creating a detailed report on them. From the
docs (using 1.0.1 currently) it is not clear what is the best way to
achieve this, it seems  actions like reduce and aggregate create
results with the same type as their inputs and they produce a new
update for each arriving event.

Is this a suitable use case for a kafka streams application? Any
pointers how to create and publish statistics to a topic only once
after all related events grouped by some key arrived? Possibly only
reading the messages from the topic when a session is to be fully
processed?

Thanks

Re: how to aggregate sessions

Posted by Vincent Maurin <vi...@gmail.com>.
I would say the SessionWindow is exactly the behavior you are describing,
so the code you put should produce the expected result.
Also check about timestamp extractor to be sure that you use timestamps
that make sense for your use case :
https://kafka.apache.org/11/documentation/streams/developer-guide/config-streams.html#timestamp-extractor


On Tue, Jul 17, 2018 at 3:52 PM <vb...@hushmail.com> wrote:

> Hi Vincent,
>
> Thanks a bunch lot for the explanation.
>
> >you can choose when you want to produce your output event based on
> the state
>
> How do I achieve this? I want to hook into the session expiration to
> call my report function with the accumulated state once the inactivity
> gap for the given key is elapsed.
>
> .stream(INPUT)
> .groupByKey()
> .windowedBy(SessionWindow)
> .aggregate()
> .map(createReportFromAccumulatedState())
> .toStream(OUTPUT)
>
> Am I on the right path here? Do I need to detect elapsed session
> windows manually somehow?
>
> On 2018. 07. 17. at 10:31 AM, "Vincent Maurin"  wrote:Hi
>
> Kafka streams sounds like a good solution there.
> The first step is to properly partition your event topics, based on
> the
> session key so all events for the same session will goes to the same
> partition.
> Then you could build your kafka streams application, that will
> maintains a
> state (manually managed or using DSL aggregation functions) with a
> windows.
> So a event will be immediately consumed, but to update the state only,
> then
> you can choose when you want to produce your output event based on the
> state
> You can read about windowing in the official documentation here
>
> https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing
>
> Best,
> Vincent
> On Tue, Jul 17, 2018 at 10:09 AM  wrote:
>
> > Hi,
> >
> > My use case includes consuming events for sessions and once an
> > inactivity gap is over creating a detailed report on them. From the
> > docs (using 1.0.1 currently) it is not clear what is the best way to
> > achieve this, it seems  actions like reduce and aggregate create
> > results with the same type as their inputs and they produce a new
> > update for each arriving event.
> >
> > Is this a suitable use case for a kafka streams application? Any
> > pointers how to create and publish statistics to a topic only once
> > after all related events grouped by some key arrived? Possibly only
> > reading the messages from the topic when a session is to be fully
> > processed?
> >
> > Thanks
> >

Re: how to aggregate sessions

Posted by vb...@hushmail.com.
Hi Vincent,

Thanks a bunch lot for the explanation.

>you can choose when you want to produce your output event based on
the state

How do I achieve this? I want to hook into the session expiration to
call my report function with the accumulated state once the inactivity
gap for the given key is elapsed.

.stream(INPUT)
.groupByKey()
.windowedBy(SessionWindow)
.aggregate()
.map(createReportFromAccumulatedState())
.toStream(OUTPUT)

Am I on the right path here? Do I need to detect elapsed session
windows manually somehow?

On 2018. 07. 17. at 10:31 AM, "Vincent Maurin"  wrote:Hi

Kafka streams sounds like a good solution there.
The first step is to properly partition your event topics, based on
the
session key so all events for the same session will goes to the same
partition.
Then you could build your kafka streams application, that will
maintains a
state (manually managed or using DSL aggregation functions) with a
windows.
So a event will be immediately consumed, but to update the state only,
then
you can choose when you want to produce your output event based on the
state
You can read about windowing in the official documentation here
https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing

Best,
Vincent
On Tue, Jul 17, 2018 at 10:09 AM  wrote:

> Hi,
>
> My use case includes consuming events for sessions and once an
> inactivity gap is over creating a detailed report on them. From the
> docs (using 1.0.1 currently) it is not clear what is the best way to
> achieve this, it seems  actions like reduce and aggregate create
> results with the same type as their inputs and they produce a new
> update for each arriving event.
>
> Is this a suitable use case for a kafka streams application? Any
> pointers how to create and publish statistics to a topic only once
> after all related events grouped by some key arrived? Possibly only
> reading the messages from the topic when a session is to be fully
> processed?
>
> Thanks
>

Re: how to aggregate sessions

Posted by Vincent Maurin <vi...@gmail.com>.
Hi

Kafka streams sounds like a good solution there.
The first step is to properly partition your event topics, based on the
session key so all events for the same session will goes to the same
partition.
Then you could build your kafka streams application, that will maintains a
state (manually managed or using DSL aggregation functions) with a windows.
So a event will be immediately consumed, but to update the state only, then
you can choose when you want to produce your output event based on the state
You can read about windowing in the official documentation here
https://kafka.apache.org/11/documentation/streams/developer-guide/dsl-api.html#windowing

Best,
Vincent



On Tue, Jul 17, 2018 at 10:09 AM <vb...@hushmail.com> wrote:

> Hi,
>
> My use case includes consuming events for sessions and once an
> inactivity gap is over creating a detailed report on them. From the
> docs (using 1.0.1 currently) it is not clear what is the best way to
> achieve this, it seems  actions like reduce and aggregate create
> results with the same type as their inputs and they produce a new
> update for each arriving event.
>
> Is this a suitable use case for a kafka streams application? Any
> pointers how to create and publish statistics to a topic only once
> after all related events grouped by some key arrived? Possibly only
> reading the messages from the topic when a session is to be fully
> processed?
>
> Thanks
>