You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Bruno Bottazzini <br...@targatelematics.com> on 2018/07/19 16:06:48 UTC

Use Kafka Streams for windowing data and processing each window at once

Hello,

We have a doubt about Kafka streams on how it works. Or at least we are
having some troubles in making it to work.

The purpose we want to achieve is to group by user some messages that
we receive from a Kafka topic and window them in order to aggregate the
messages we receive in the window (5 minutes). Then, I'd like to
collect all aggregates in each window in order to process them at once
adding them to a report of all the messages I received in the 5 minutes
interval.

The last point seems to be the tough part as Kafka Streams doesn't seem
to provide (at least we can't find it :() anything that can collect all
the window related stuff in a "finite" stream to be processed in one
place.

The file (implemented_code.txt) contains the code we have implemented
where it contains at least one of our tries to make it to work. 

You can find its result inside the file (result.txt)

For each window there are many log lines and they are mixed with the
other windows.

What I'd like to have is something like:

// Hypothetical implementation
windowedMessages.streamWindows((interval, window) -> process(interval,
window));

where method process would be something like:

// Hypothetical implementation
void process(Interval interval, WindowStream<UserId, List<Message>>
windowStream) {
// Create report for the whole window   
Report report = new Report(nameFromInterval());
    // Loop on the finite iterable that represents the window content
    for (WindowStreamEntry<UserId, List<Message>> entry: windowStream)
{
        report.addLine(entry.getKey(), entry.getValue());
    }
    report.close();
}


Re: Use Kafka Streams for windowing data and processing each window at once

Posted by Bruno Bottazzini <br...@targatelematics.com>.
Hello Bill,

Thank you very much for the response. We are looking forward to the KIP
feature.

Best Regards,

Bruno

On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> Hi Bruno,
> 
> What you are asking is a common request.  There is a KIP in the
> works,
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+
> to+suppress+updates+for+KTables,
> that should suit the requirements you've outlined.
> 
> In the meantime, I'll see if I can come up with an alternative
> approach
> over the next few days.
> 
> -Bill
> 
> On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> bruno.bottazzini@targatelematics.com> wrote:
> 
> > 
> > Hello,
> > 
> > We have a doubt about Kafka streams on how it works. Or at least we
> > are
> > having some troubles in making it to work.
> > 
> > The purpose we want to achieve is to group by user some messages
> > that
> > we receive from a Kafka topic and window them in order to aggregate
> > the
> > messages we receive in the window (5 minutes). Then, I'd like to
> > collect all aggregates in each window in order to process them at
> > once
> > adding them to a report of all the messages I received in the 5
> > minutes
> > interval.
> > 
> > The last point seems to be the tough part as Kafka Streams doesn't
> > seem
> > to provide (at least we can't find it :() anything that can collect
> > all
> > the window related stuff in a "finite" stream to be processed in
> > one
> > place.
> > 
> > The file (implemented_code.txt) contains the code we have
> > implemented
> > where it contains at least one of our tries to make it to work.
> > 
> > You can find its result inside the file (result.txt)
> > 
> > For each window there are many log lines and they are mixed with
> > the
> > other windows.
> > 
> > What I'd like to have is something like:
> > 
> > // Hypothetical implementation
> > windowedMessages.streamWindows((interval, window) ->
> > process(interval,
> > window));
> > 
> > where method process would be something like:
> > 
> > // Hypothetical implementation
> > void process(Interval interval, WindowStream<UserId, List<Message>>
> > windowStream) {
> > // Create report for the whole window
> > Report report = new Report(nameFromInterval());
> >     // Loop on the finite iterable that represents the window
> > content
> >     for (WindowStreamEntry<UserId, List<Message>> entry:
> > windowStream)
> > {
> >         report.addLine(entry.getKey(), entry.getValue());
> >     }
> >     report.close();
> > }
> > 
> > 

Re: Use Kafka Streams for windowing data and processing each window at once

Posted by Bruno Bottazzini <br...@targatelematics.com>.
Hello Guozhang,

I understand. 

Thank you very much for the answer.

Best Regards,

Bruno

On lun, 2018-07-23 at 16:35 -0700, Guozhang Wang wrote:
> I see.
> 
> In that case, one workaround would be to query the state store
> directly
> after you know that no more updates would be applied to that store in
> a
> `punctuation` function: note that punctuation is a feature that's
> only
> available in the Processor API, but you can always add such a lower-
> level
> implementation into your DSL topology by calling `KStream#process() /
> transform()`.
> 
> Then what you can do, is to schedule a stream-time based punctuation
> function with the scheduling interval equal to the window size, and
> then
> whenever the punctuation function is triggered, it is triggered with
> the
> current stream time as its parameter, by using this stream time you
> can
> then determine which window(s) have become final: for example, if
> your
> window length is 10 and grace period is configured as 5, and your
> punctuation function triggers at 25, then you know that your window
> of [0,
> 10) should have been final already. You can then access that windowed
> state
> store directly using the fetchAll(fromTimestamp, toTimestamp) API to
> fetch
> all the keys of that range to generate the global report.
> 
> 
> Guozhang
> 
> 
> On Mon, Jul 23, 2018 at 12:48 AM, Bruno Bottazzini <
> bruno.bottazzini@targatelematics.com> wrote:
> 
> > 
> > Hello Guozhang,
> > 
> > Managing to have a stream with just one record per each key per
> > window
> > is definitely something we want to achieve.
> > 
> > But, it is not all. We also want to process the whole window at
> > once so
> > our callback would receive just one collection of aggregates per
> > window
> > once it is ready.
> > 
> > We would probably need to receive the collection as an iterable
> > that
> > dynamically loads the window in chunks as the size of the window
> > could
> > be unmanageable as single object.
> > 
> > This way we could produce one report for each window in the example
> > "Final window result per key" you manage to send an alert for each
> > user
> > with less than three events but we also want to collect in one
> > report
> > the list of all users with less than three events in the one hour
> > window.
> > 
> > Best Regards,
> > 
> > Bruno
> > 
> > On ven, 2018-07-20 at 09:11 -0700, Guozhang Wang wrote:
> > > 
> > > Hello Bruno,
> > > 
> > > We've discussed about the callback approach before, but then we
> > > realized
> > > with the proposed API, this can still be achieved. In the "Final
> > > window
> > > results per key" section, John showed how to do that. Note the
> > > resulted
> > > stream will have exactly one record per each key, with the value
> > > representing the "final result" for that key.
> > > 
> > > 
> > > Guozhang
> > > 
> > > 
> > > On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini <
> > > bruno.bottazzini@targatelematics.com> wrote:
> > > 
> > > > 
> > > > 
> > > > Bill,
> > > > 
> > > > After reading the documentation and sure it looks really close
> > > > to
> > > > our
> > > > need however I had a doubt about it.
> > > > 
> > > > One small question.
> > > > 
> > > > I was expecting also a callback that Kafka would call after the
> > > > whole
> > > > period is passed and this callback would receive an iterable
> > > > object
> > > > that contains all the aggregated information that was collected
> > > > in
> > > > the
> > > > same period.
> > > > 
> > > > Will it be possible when using grace() or suppress()?
> > > > 
> > > > Best Regards,
> > > > 
> > > > Bruno
> > > > 
> > > > On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> > > > > 
> > > > > 
> > > > > Hi Bruno,
> > > > > 
> > > > > What you are asking is a common request.  There is a KIP in
> > > > > the
> > > > > works,
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+
> > > > > Abil
> > > > > ity+
> > > > > to+suppress+updates+for+KTables,
> > > > > that should suit the requirements you've outlined.
> > > > > 
> > > > > In the meantime, I'll see if I can come up with an
> > > > > alternative
> > > > > approach
> > > > > over the next few days.
> > > > > 
> > > > > -Bill
> > > > > 
> > > > > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> > > > > bruno.bottazzini@targatelematics.com> wrote:
> > > > > 
> > > > > > 
> > > > > > 
> > > > > > 
> > > > > > Hello,
> > > > > > 
> > > > > > We have a doubt about Kafka streams on how it works. Or at
> > > > > > least we
> > > > > > are
> > > > > > having some troubles in making it to work.
> > > > > > 
> > > > > > The purpose we want to achieve is to group by user some
> > > > > > messages
> > > > > > that
> > > > > > we receive from a Kafka topic and window them in order to
> > > > > > aggregate
> > > > > > the
> > > > > > messages we receive in the window (5 minutes). Then, I'd
> > > > > > like
> > > > > > to
> > > > > > collect all aggregates in each window in order to process
> > > > > > them
> > > > > > at
> > > > > > once
> > > > > > adding them to a report of all the messages I received in
> > > > > > the 5
> > > > > > minutes
> > > > > > interval.
> > > > > > 
> > > > > > The last point seems to be the tough part as Kafka Streams
> > > > > > doesn't
> > > > > > seem
> > > > > > to provide (at least we can't find it :() anything that can
> > > > > > collect
> > > > > > all
> > > > > > the window related stuff in a "finite" stream to be
> > > > > > processed
> > > > > > in
> > > > > > one
> > > > > > place.
> > > > > > 
> > > > > > The file (implemented_code.txt) contains the code we have
> > > > > > implemented
> > > > > > where it contains at least one of our tries to make it to
> > > > > > work.
> > > > > > 
> > > > > > You can find its result inside the file (result.txt)
> > > > > > 
> > > > > > For each window there are many log lines and they are mixed
> > > > > > with
> > > > > > the
> > > > > > other windows.
> > > > > > 
> > > > > > What I'd like to have is something like:
> > > > > > 
> > > > > > // Hypothetical implementation
> > > > > > windowedMessages.streamWindows((interval, window) ->
> > > > > > process(interval,
> > > > > > window));
> > > > > > 
> > > > > > where method process would be something like:
> > > > > > 
> > > > > > // Hypothetical implementation
> > > > > > void process(Interval interval, WindowStream<UserId,
> > > > > > List<Message>>
> > > > > > windowStream) {
> > > > > > // Create report for the whole window
> > > > > > Report report = new Report(nameFromInterval());
> > > > > >     // Loop on the finite iterable that represents the
> > > > > > window
> > > > > > content
> > > > > >     for (WindowStreamEntry<UserId, List<Message>> entry:
> > > > > > windowStream)
> > > > > > {
> > > > > >         report.addLine(entry.getKey(), entry.getValue());
> > > > > >     }
> > > > > >     report.close();
> > > > > > }
> > > > > > 
> > > > > > 
> > > > 
> > > 
> > > 
> > > 
> > 
> 
> 
> 

Re: Use Kafka Streams for windowing data and processing each window at once

Posted by Guozhang Wang <wa...@gmail.com>.
I see.

In that case, one workaround would be to query the state store directly
after you know that no more updates would be applied to that store in a
`punctuation` function: note that punctuation is a feature that's only
available in the Processor API, but you can always add such a lower-level
implementation into your DSL topology by calling `KStream#process() /
transform()`.

Then what you can do, is to schedule a stream-time based punctuation
function with the scheduling interval equal to the window size, and then
whenever the punctuation function is triggered, it is triggered with the
current stream time as its parameter, by using this stream time you can
then determine which window(s) have become final: for example, if your
window length is 10 and grace period is configured as 5, and your
punctuation function triggers at 25, then you know that your window of [0,
10) should have been final already. You can then access that windowed state
store directly using the fetchAll(fromTimestamp, toTimestamp) API to fetch
all the keys of that range to generate the global report.


Guozhang


On Mon, Jul 23, 2018 at 12:48 AM, Bruno Bottazzini <
bruno.bottazzini@targatelematics.com> wrote:

> Hello Guozhang,
>
> Managing to have a stream with just one record per each key per window
> is definitely something we want to achieve.
>
> But, it is not all. We also want to process the whole window at once so
> our callback would receive just one collection of aggregates per window
> once it is ready.
>
> We would probably need to receive the collection as an iterable that
> dynamically loads the window in chunks as the size of the window could
> be unmanageable as single object.
>
> This way we could produce one report for each window in the example
> "Final window result per key" you manage to send an alert for each user
> with less than three events but we also want to collect in one report
> the list of all users with less than three events in the one hour
> window.
>
> Best Regards,
>
> Bruno
>
> On ven, 2018-07-20 at 09:11 -0700, Guozhang Wang wrote:
> > Hello Bruno,
> >
> > We've discussed about the callback approach before, but then we
> > realized
> > with the proposed API, this can still be achieved. In the "Final
> > window
> > results per key" section, John showed how to do that. Note the
> > resulted
> > stream will have exactly one record per each key, with the value
> > representing the "final result" for that key.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini <
> > bruno.bottazzini@targatelematics.com> wrote:
> >
> > >
> > > Bill,
> > >
> > > After reading the documentation and sure it looks really close to
> > > our
> > > need however I had a doubt about it.
> > >
> > > One small question.
> > >
> > > I was expecting also a callback that Kafka would call after the
> > > whole
> > > period is passed and this callback would receive an iterable object
> > > that contains all the aggregated information that was collected in
> > > the
> > > same period.
> > >
> > > Will it be possible when using grace() or suppress()?
> > >
> > > Best Regards,
> > >
> > > Bruno
> > >
> > > On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> > > >
> > > > Hi Bruno,
> > > >
> > > > What you are asking is a common request.  There is a KIP in the
> > > > works,
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Abil
> > > > ity+
> > > > to+suppress+updates+for+KTables,
> > > > that should suit the requirements you've outlined.
> > > >
> > > > In the meantime, I'll see if I can come up with an alternative
> > > > approach
> > > > over the next few days.
> > > >
> > > > -Bill
> > > >
> > > > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> > > > bruno.bottazzini@targatelematics.com> wrote:
> > > >
> > > > >
> > > > >
> > > > > Hello,
> > > > >
> > > > > We have a doubt about Kafka streams on how it works. Or at
> > > > > least we
> > > > > are
> > > > > having some troubles in making it to work.
> > > > >
> > > > > The purpose we want to achieve is to group by user some
> > > > > messages
> > > > > that
> > > > > we receive from a Kafka topic and window them in order to
> > > > > aggregate
> > > > > the
> > > > > messages we receive in the window (5 minutes). Then, I'd like
> > > > > to
> > > > > collect all aggregates in each window in order to process them
> > > > > at
> > > > > once
> > > > > adding them to a report of all the messages I received in the 5
> > > > > minutes
> > > > > interval.
> > > > >
> > > > > The last point seems to be the tough part as Kafka Streams
> > > > > doesn't
> > > > > seem
> > > > > to provide (at least we can't find it :() anything that can
> > > > > collect
> > > > > all
> > > > > the window related stuff in a "finite" stream to be processed
> > > > > in
> > > > > one
> > > > > place.
> > > > >
> > > > > The file (implemented_code.txt) contains the code we have
> > > > > implemented
> > > > > where it contains at least one of our tries to make it to work.
> > > > >
> > > > > You can find its result inside the file (result.txt)
> > > > >
> > > > > For each window there are many log lines and they are mixed
> > > > > with
> > > > > the
> > > > > other windows.
> > > > >
> > > > > What I'd like to have is something like:
> > > > >
> > > > > // Hypothetical implementation
> > > > > windowedMessages.streamWindows((interval, window) ->
> > > > > process(interval,
> > > > > window));
> > > > >
> > > > > where method process would be something like:
> > > > >
> > > > > // Hypothetical implementation
> > > > > void process(Interval interval, WindowStream<UserId,
> > > > > List<Message>>
> > > > > windowStream) {
> > > > > // Create report for the whole window
> > > > > Report report = new Report(nameFromInterval());
> > > > >     // Loop on the finite iterable that represents the window
> > > > > content
> > > > >     for (WindowStreamEntry<UserId, List<Message>> entry:
> > > > > windowStream)
> > > > > {
> > > > >         report.addLine(entry.getKey(), entry.getValue());
> > > > >     }
> > > > >     report.close();
> > > > > }
> > > > >
> > > > >
> > >
> >
> >
> >
>



-- 
-- Guozhang

Re: Use Kafka Streams for windowing data and processing each window at once

Posted by Bruno Bottazzini <br...@targatelematics.com>.
Hello Guozhang,

Managing to have a stream with just one record per each key per window
is definitely something we want to achieve.

But, it is not all. We also want to process the whole window at once so
our callback would receive just one collection of aggregates per window
once it is ready. 

We would probably need to receive the collection as an iterable that
dynamically loads the window in chunks as the size of the window could
be unmanageable as single object.

This way we could produce one report for each window in the example
"Final window result per key" you manage to send an alert for each user
with less than three events but we also want to collect in one report
the list of all users with less than three events in the one hour
window.

Best Regards,

Bruno

On ven, 2018-07-20 at 09:11 -0700, Guozhang Wang wrote:
> Hello Bruno,
> 
> We've discussed about the callback approach before, but then we
> realized
> with the proposed API, this can still be achieved. In the "Final
> window
> results per key" section, John showed how to do that. Note the
> resulted
> stream will have exactly one record per each key, with the value
> representing the "final result" for that key.
> 
> 
> Guozhang
> 
> 
> On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini <
> bruno.bottazzini@targatelematics.com> wrote:
> 
> > 
> > Bill,
> > 
> > After reading the documentation and sure it looks really close to
> > our
> > need however I had a doubt about it.
> > 
> > One small question.
> > 
> > I was expecting also a callback that Kafka would call after the
> > whole
> > period is passed and this callback would receive an iterable object
> > that contains all the aggregated information that was collected in
> > the
> > same period.
> > 
> > Will it be possible when using grace() or suppress()?
> > 
> > Best Regards,
> > 
> > Bruno
> > 
> > On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> > > 
> > > Hi Bruno,
> > > 
> > > What you are asking is a common request.  There is a KIP in the
> > > works,
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Abil
> > > ity+
> > > to+suppress+updates+for+KTables,
> > > that should suit the requirements you've outlined.
> > > 
> > > In the meantime, I'll see if I can come up with an alternative
> > > approach
> > > over the next few days.
> > > 
> > > -Bill
> > > 
> > > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> > > bruno.bottazzini@targatelematics.com> wrote:
> > > 
> > > > 
> > > > 
> > > > Hello,
> > > > 
> > > > We have a doubt about Kafka streams on how it works. Or at
> > > > least we
> > > > are
> > > > having some troubles in making it to work.
> > > > 
> > > > The purpose we want to achieve is to group by user some
> > > > messages
> > > > that
> > > > we receive from a Kafka topic and window them in order to
> > > > aggregate
> > > > the
> > > > messages we receive in the window (5 minutes). Then, I'd like
> > > > to
> > > > collect all aggregates in each window in order to process them
> > > > at
> > > > once
> > > > adding them to a report of all the messages I received in the 5
> > > > minutes
> > > > interval.
> > > > 
> > > > The last point seems to be the tough part as Kafka Streams
> > > > doesn't
> > > > seem
> > > > to provide (at least we can't find it :() anything that can
> > > > collect
> > > > all
> > > > the window related stuff in a "finite" stream to be processed
> > > > in
> > > > one
> > > > place.
> > > > 
> > > > The file (implemented_code.txt) contains the code we have
> > > > implemented
> > > > where it contains at least one of our tries to make it to work.
> > > > 
> > > > You can find its result inside the file (result.txt)
> > > > 
> > > > For each window there are many log lines and they are mixed
> > > > with
> > > > the
> > > > other windows.
> > > > 
> > > > What I'd like to have is something like:
> > > > 
> > > > // Hypothetical implementation
> > > > windowedMessages.streamWindows((interval, window) ->
> > > > process(interval,
> > > > window));
> > > > 
> > > > where method process would be something like:
> > > > 
> > > > // Hypothetical implementation
> > > > void process(Interval interval, WindowStream<UserId,
> > > > List<Message>>
> > > > windowStream) {
> > > > // Create report for the whole window
> > > > Report report = new Report(nameFromInterval());
> > > >     // Loop on the finite iterable that represents the window
> > > > content
> > > >     for (WindowStreamEntry<UserId, List<Message>> entry:
> > > > windowStream)
> > > > {
> > > >         report.addLine(entry.getKey(), entry.getValue());
> > > >     }
> > > >     report.close();
> > > > }
> > > > 
> > > > 
> > 
> 
> 
> 

Re: Use Kafka Streams for windowing data and processing each window at once

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Bruno,

We've discussed about the callback approach before, but then we realized
with the proposed API, this can still be achieved. In the "Final window
results per key" section, John showed how to do that. Note the resulted
stream will have exactly one record per each key, with the value
representing the "final result" for that key.


Guozhang


On Fri, Jul 20, 2018 at 8:13 AM, Bruno Bottazzini <
bruno.bottazzini@targatelematics.com> wrote:

> Bill,
>
> After reading the documentation and sure it looks really close to our
> need however I had a doubt about it.
>
> One small question.
>
> I was expecting also a callback that Kafka would call after the whole
> period is passed and this callback would receive an iterable object
> that contains all the aggregated information that was collected in the
> same period.
>
> Will it be possible when using grace() or suppress()?
>
> Best Regards,
>
> Bruno
>
> On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> > Hi Bruno,
> >
> > What you are asking is a common request.  There is a KIP in the
> > works,
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+
> > to+suppress+updates+for+KTables,
> > that should suit the requirements you've outlined.
> >
> > In the meantime, I'll see if I can come up with an alternative
> > approach
> > over the next few days.
> >
> > -Bill
> >
> > On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> > bruno.bottazzini@targatelematics.com> wrote:
> >
> > >
> > > Hello,
> > >
> > > We have a doubt about Kafka streams on how it works. Or at least we
> > > are
> > > having some troubles in making it to work.
> > >
> > > The purpose we want to achieve is to group by user some messages
> > > that
> > > we receive from a Kafka topic and window them in order to aggregate
> > > the
> > > messages we receive in the window (5 minutes). Then, I'd like to
> > > collect all aggregates in each window in order to process them at
> > > once
> > > adding them to a report of all the messages I received in the 5
> > > minutes
> > > interval.
> > >
> > > The last point seems to be the tough part as Kafka Streams doesn't
> > > seem
> > > to provide (at least we can't find it :() anything that can collect
> > > all
> > > the window related stuff in a "finite" stream to be processed in
> > > one
> > > place.
> > >
> > > The file (implemented_code.txt) contains the code we have
> > > implemented
> > > where it contains at least one of our tries to make it to work.
> > >
> > > You can find its result inside the file (result.txt)
> > >
> > > For each window there are many log lines and they are mixed with
> > > the
> > > other windows.
> > >
> > > What I'd like to have is something like:
> > >
> > > // Hypothetical implementation
> > > windowedMessages.streamWindows((interval, window) ->
> > > process(interval,
> > > window));
> > >
> > > where method process would be something like:
> > >
> > > // Hypothetical implementation
> > > void process(Interval interval, WindowStream<UserId, List<Message>>
> > > windowStream) {
> > > // Create report for the whole window
> > > Report report = new Report(nameFromInterval());
> > >     // Loop on the finite iterable that represents the window
> > > content
> > >     for (WindowStreamEntry<UserId, List<Message>> entry:
> > > windowStream)
> > > {
> > >         report.addLine(entry.getKey(), entry.getValue());
> > >     }
> > >     report.close();
> > > }
> > >
> > >
>



-- 
-- Guozhang

Re: Use Kafka Streams for windowing data and processing each window at once

Posted by Bruno Bottazzini <br...@targatelematics.com>.
Bill,

After reading the documentation and sure it looks really close to our
need however I had a doubt about it.

One small question.

I was expecting also a callback that Kafka would call after the whole
period is passed and this callback would receive an iterable object
that contains all the aggregated information that was collected in the
same period.

Will it be possible when using grace() or suppress()?

Best Regards,

Bruno

On gio, 2018-07-19 at 12:59 -0400, Bill Bejeck wrote:
> Hi Bruno,
> 
> What you are asking is a common request.  There is a KIP in the
> works,
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+
> to+suppress+updates+for+KTables,
> that should suit the requirements you've outlined.
> 
> In the meantime, I'll see if I can come up with an alternative
> approach
> over the next few days.
> 
> -Bill
> 
> On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
> bruno.bottazzini@targatelematics.com> wrote:
> 
> > 
> > Hello,
> > 
> > We have a doubt about Kafka streams on how it works. Or at least we
> > are
> > having some troubles in making it to work.
> > 
> > The purpose we want to achieve is to group by user some messages
> > that
> > we receive from a Kafka topic and window them in order to aggregate
> > the
> > messages we receive in the window (5 minutes). Then, I'd like to
> > collect all aggregates in each window in order to process them at
> > once
> > adding them to a report of all the messages I received in the 5
> > minutes
> > interval.
> > 
> > The last point seems to be the tough part as Kafka Streams doesn't
> > seem
> > to provide (at least we can't find it :() anything that can collect
> > all
> > the window related stuff in a "finite" stream to be processed in
> > one
> > place.
> > 
> > The file (implemented_code.txt) contains the code we have
> > implemented
> > where it contains at least one of our tries to make it to work.
> > 
> > You can find its result inside the file (result.txt)
> > 
> > For each window there are many log lines and they are mixed with
> > the
> > other windows.
> > 
> > What I'd like to have is something like:
> > 
> > // Hypothetical implementation
> > windowedMessages.streamWindows((interval, window) ->
> > process(interval,
> > window));
> > 
> > where method process would be something like:
> > 
> > // Hypothetical implementation
> > void process(Interval interval, WindowStream<UserId, List<Message>>
> > windowStream) {
> > // Create report for the whole window
> > Report report = new Report(nameFromInterval());
> >     // Loop on the finite iterable that represents the window
> > content
> >     for (WindowStreamEntry<UserId, List<Message>> entry:
> > windowStream)
> > {
> >         report.addLine(entry.getKey(), entry.getValue());
> >     }
> >     report.close();
> > }
> > 
> > 

Re: Use Kafka Streams for windowing data and processing each window at once

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Bruno,

What you are asking is a common request.  There is a KIP in the works,
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables,
that should suit the requirements you've outlined.

In the meantime, I'll see if I can come up with an alternative approach
over the next few days.

-Bill

On Thu, Jul 19, 2018 at 12:07 PM Bruno Bottazzini <
bruno.bottazzini@targatelematics.com> wrote:

> Hello,
>
> We have a doubt about Kafka streams on how it works. Or at least we are
> having some troubles in making it to work.
>
> The purpose we want to achieve is to group by user some messages that
> we receive from a Kafka topic and window them in order to aggregate the
> messages we receive in the window (5 minutes). Then, I'd like to
> collect all aggregates in each window in order to process them at once
> adding them to a report of all the messages I received in the 5 minutes
> interval.
>
> The last point seems to be the tough part as Kafka Streams doesn't seem
> to provide (at least we can't find it :() anything that can collect all
> the window related stuff in a "finite" stream to be processed in one
> place.
>
> The file (implemented_code.txt) contains the code we have implemented
> where it contains at least one of our tries to make it to work.
>
> You can find its result inside the file (result.txt)
>
> For each window there are many log lines and they are mixed with the
> other windows.
>
> What I'd like to have is something like:
>
> // Hypothetical implementation
> windowedMessages.streamWindows((interval, window) -> process(interval,
> window));
>
> where method process would be something like:
>
> // Hypothetical implementation
> void process(Interval interval, WindowStream<UserId, List<Message>>
> windowStream) {
> // Create report for the whole window
> Report report = new Report(nameFromInterval());
>     // Loop on the finite iterable that represents the window content
>     for (WindowStreamEntry<UserId, List<Message>> entry: windowStream)
> {
>         report.addLine(entry.getKey(), entry.getValue());
>     }
>     report.close();
> }
>
>