You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Viktor Markvardt <vi...@gmail.com> on 2020/01/14 16:41:11 UTC

Streams, Kafka windows

Hi,

My name is Viktor. I'm currently working with Kafka streams and have
several questions about Kafka and I can not find answers in the official
docs.

1. Why suppress functionality does not work with Hopping windows? How to
make it work?

Example of the code:

KStream<String, String> finalStream = source
                .groupByKey()

.windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
                .reduce((aggValue, newValue) -> newValue,
Materialized.with(Serdes.String(), Serdes.String()))

.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream();

finalStream.print(Printed.toSysOut());
finalStream.to(outputTopic);

After I run the code above - output stream is empty. There were no
errors/exceptions.
NOTE: With Tumbling Window the code working as expected.
Maybe I simply use it incorrectly?

2. Why with Hopping windows (without suppress) there are duplicates in the
output stream?
E.g., I send one record in the input kstream with Hopping window
(duration=30s, advanceBy=2s) but get two same records (duplicate) in the
output kstream.
Is that an expected behavior? If so, how can I filter/switch off these
duplicates?

3. Mainly I'm trying to solve this problem:
I have kstream with events inside and events can be repeated (duplicates).
In the output kstream I would like to receive only unique events for the
last 24 hours (window duration) with 1 hour window overlay (window
advanceBy).
Could you recommend me any examples of code or docs please?
I have already read official docs and examples but it was not enough to get
full understanding of how I can achieve this.

Best regards,
Viktor Markvardt

Re: Streams, Kafka windows

Posted by Sachin Mittal <sj...@gmail.com>.
You can try to convert the final resultant stream to table.
Check this page for more info:
https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This way table would always contain the latest (single) record for a given
key.

Sachin




On Tue, Jan 14, 2020 at 10:11 PM Viktor Markvardt <
viktor.markvardt@gmail.com> wrote:

> Hi,
>
> My name is Viktor. I'm currently working with Kafka streams and have
> several questions about Kafka and I can not find answers in the official
> docs.
>
> 1. Why suppress functionality does not work with Hopping windows? How to
> make it work?
>
> Example of the code:
>
> KStream<String, String> finalStream = source
>                 .groupByKey()
>
>
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
>                 .reduce((aggValue, newValue) -> newValue,
> Materialized.with(Serdes.String(), Serdes.String()))
>
>
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>                 .toStream();
>
> finalStream.print(Printed.toSysOut());
> finalStream.to(outputTopic);
>
> After I run the code above - output stream is empty. There were no
> errors/exceptions.
> NOTE: With Tumbling Window the code working as expected.
> Maybe I simply use it incorrectly?
>
> 2. Why with Hopping windows (without suppress) there are duplicates in the
> output stream?
> E.g., I send one record in the input kstream with Hopping window
> (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> output kstream.
> Is that an expected behavior? If so, how can I filter/switch off these
> duplicates?
>
> 3. Mainly I'm trying to solve this problem:
> I have kstream with events inside and events can be repeated (duplicates).
> In the output kstream I would like to receive only unique events for the
> last 24 hours (window duration) with 1 hour window overlay (window
> advanceBy).
> Could you recommend me any examples of code or docs please?
> I have already read official docs and examples but it was not enough to get
> full understanding of how I can achieve this.
>
> Best regards,
> Viktor Markvardt
>

Re: Streams, Kafka windows

Posted by Sachin Mittal <sj...@gmail.com>.
You can try to convert the final resultant stream to table.
Check this page for more info:
https://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step

This way table would always contain the latest (single) record for a given
key.

Sachin




On Tue, Jan 14, 2020 at 10:11 PM Viktor Markvardt <
viktor.markvardt@gmail.com> wrote:

> Hi,
>
> My name is Viktor. I'm currently working with Kafka streams and have
> several questions about Kafka and I can not find answers in the official
> docs.
>
> 1. Why suppress functionality does not work with Hopping windows? How to
> make it work?
>
> Example of the code:
>
> KStream<String, String> finalStream = source
>                 .groupByKey()
>
>
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
>                 .reduce((aggValue, newValue) -> newValue,
> Materialized.with(Serdes.String(), Serdes.String()))
>
>
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>                 .toStream();
>
> finalStream.print(Printed.toSysOut());
> finalStream.to(outputTopic);
>
> After I run the code above - output stream is empty. There were no
> errors/exceptions.
> NOTE: With Tumbling Window the code working as expected.
> Maybe I simply use it incorrectly?
>
> 2. Why with Hopping windows (without suppress) there are duplicates in the
> output stream?
> E.g., I send one record in the input kstream with Hopping window
> (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> output kstream.
> Is that an expected behavior? If so, how can I filter/switch off these
> duplicates?
>
> 3. Mainly I'm trying to solve this problem:
> I have kstream with events inside and events can be repeated (duplicates).
> In the output kstream I would like to receive only unique events for the
> last 24 hours (window duration) with 1 hour window overlay (window
> advanceBy).
> Could you recommend me any examples of code or docs please?
> I have already read official docs and examples but it was not enough to get
> full understanding of how I can achieve this.
>
> Best regards,
> Viktor Markvardt
>

Re: Streams, Kafka windows

Posted by John Roesler <vv...@apache.org>.
Setting a new record for procrastination, I've just created this ticket:
https://issues.apache.org/jira/browse/KAFKA-10058

On Sat, Jan 18, 2020, at 22:03, John Roesler wrote:
> Good idea! I’ll make a note to do it when I’m at a computer. 
> 
> On Sat, Jan 18, 2020, at 21:51, Guozhang Wang wrote:
> > Hey John,
> > 
> > Since this is a common question and I've seen many users asking about
> > window semantics like this, could you file a JIRA ticket for creating a
> > wiki page like Join Semantics (
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
> > to summarize the windowing operations like this too?
> > 
> > Guozhang
> > 
> > On Sat, Jan 18, 2020 at 3:22 PM John Roesler <vv...@apache.org> wrote:
> > 
> > > Glad it helped!
> > > -John
> > >
> > > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > > > Hi John,
> > > >
> > > > Thank you for your assistance!
> > > > Your example very help me and I understood kafka-streams more clearly
> > > now.
> > > > Have a nice weekend :)
> > > >
> > > > Best regards,
> > > > Viktor Markvardt
> > > >
> > > > чт, 16 янв. 2020 г. в 19:29, John Roesler <vv...@apache.org>:
> > > >
> > > > > Hi Viktor,
> > > > >
> > > > > I’m starting to wonder what exactly “duplicate” means in this context.
> > > Can
> > > > > you elaborate?
> > > > >
> > > > > In case it helps, with your window definition, if I send a record with
> > > > > timestamp 20, it would actually belong to three different windows:
> > > > > [0,30)
> > > > > [10,40)
> > > > > [20,50)
> > > > >
> > > > > Because of this, you would (correctly) see three output records for
> > > that
> > > > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > > > they’d have different keys:
> > > > >
> > > > > Input:
> > > > > Key1: Val1 @ timestamp:20
> > > > >
> > > > > Output:
> > > > > Windowed<Window(0,30),Key1>: 1
> > > > > Windowed<Window(10,40),Key1>: 1
> > > > > Windowed<Window(20,50),Key1>: 1
> > > > >
> > > > > Any chance that explains your observation?
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > > > Hi John,
> > > > > >
> > > > > > Thanks for answering my questions!
> > > > > > I observe behavior which I can not understand.
> > > > > > The code is working, but when delay between records larger then
> > > window
> > > > > > duration I receive duplicated records.
> > > > > > With the code below I received duplicated records in the output
> > > kstream.
> > > > > > Count of duplicate records is always 3. If I change
> > > duration/advanceBy
> > > > > > count of duplicated records is changing also.
> > > > > > Do you have any ideas why duplicated records are received in the
> > > output
> > > > > > kstream?
> > > > > >
> > > > > > KStream<String, String> windowedStream = source
> > > > > >     .groupByKey()
> > > > > >
> > > > > >
> > > > >
> > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > > > >     .count()
> > > > > >
> > > > > >
> > > > >
> > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > >     .toStream();
> > > > > >
> > > > > >
> > > > > > Best regards,
> > > > > > Viktor Markvardt
> > > > > >
> > > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler <vv...@apache.org>:
> > > > > >
> > > > > > > Hi Viktor,
> > > > > > >
> > > > > > > I’m not sure why you get two identical outputs in response to a
> > > single
> > > > > > > record. Regardless, since you say that you want to get a single,
> > > final
> > > > > > > result for the window and you expect multiple inputs to the
> > > windows,
> > > > > you
> > > > > > > need Suppression.
> > > > > > >
> > > > > > > My guess is that you just sent one record to try it out and didn’t
> > > see
> > > > > any
> > > > > > > output? This is expected. Just as the window boundaries are
> > > defined by
> > > > > the
> > > > > > > time stamps of the records, not by the current system time,
> > > > > suppression is
> > > > > > > governed by the timestamp of the records. I.e., a thirty-second
> > > window
> > > > > is
> > > > > > > not actually closed until you see a new record with a timestamp
> > > thirty
> > > > > > > seconds later.
> > > > > > >
> > > > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > > > timestamps. If the first record has timestamp T, then you should
> > > see an
> > > > > > > output when you pass in a record with timestamp T+30.
> > > > > > >
> > > > > > > Important note: there is a built-in grace period that delays the
> > > > > output of
> > > > > > > final results after the window ends. For complicated reasons, the
> > > > > default
> > > > > > > is 24 hours! So you would actually not see an output until you
> > > send a
> > > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> > > set
> > > > > the
> > > > > > > grace period on TimeWindows to zero for your testing. You can
> > > increase
> > > > > it
> > > > > > > later if you want to tolerate some late-arriving records.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > My name is Viktor. I'm currently working with Kafka streams and
> > > have
> > > > > > > > several questions about Kafka and I can not find answers in the
> > > > > official
> > > > > > > > docs.
> > > > > > > >
> > > > > > > > 1. Why suppress functionality does not work with Hopping windows?
> > > > > How to
> > > > > > > > make it work?
> > > > > > > >
> > > > > > > > Example of the code:
> > > > > > > >
> > > > > > > > KStream<String, String> finalStream = source
> > > > > > > >                 .groupByKey()
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > > > > >                 .reduce((aggValue, newValue) -> newValue,
> > > > > > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > > > >                 .toStream();
> > > > > > > >
> > > > > > > > finalStream.print(Printed.toSysOut());
> > > > > > > > finalStream.to(outputTopic);
> > > > > > > >
> > > > > > > > After I run the code above - output stream is empty. There were
> > > no
> > > > > > > > errors/exceptions.
> > > > > > > > NOTE: With Tumbling Window the code working as expected.
> > > > > > > > Maybe I simply use it incorrectly?
> > > > > > > >
> > > > > > > > 2. Why with Hopping windows (without suppress) there are
> > > duplicates
> > > > > in
> > > > > > > the
> > > > > > > > output stream?
> > > > > > > > E.g., I send one record in the input kstream with Hopping window
> > > > > > > > (duration=30s, advanceBy=2s) but get two same records
> > > (duplicate) in
> > > > > the
> > > > > > > > output kstream.
> > > > > > > > Is that an expected behavior? If so, how can I filter/switch off
> > > > > these
> > > > > > > > duplicates?
> > > > > > > >
> > > > > > > > 3. Mainly I'm trying to solve this problem:
> > > > > > > > I have kstream with events inside and events can be repeated
> > > > > > > (duplicates).
> > > > > > > > In the output kstream I would like to receive only unique events
> > > for
> > > > > the
> > > > > > > > last 24 hours (window duration) with 1 hour window overlay
> > > (window
> > > > > > > > advanceBy).
> > > > > > > > Could you recommend me any examples of code or docs please?
> > > > > > > > I have already read official docs and examples but it was not
> > > enough
> > > > > to
> > > > > > > get
> > > > > > > > full understanding of how I can achieve this.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Viktor Markvardt
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > 
> > 
> > -- 
> > -- Guozhang
> >
>

Re: Streams, Kafka windows

Posted by John Roesler <vv...@apache.org>.
Good idea! I’ll make a note to do it when I’m at a computer. 

On Sat, Jan 18, 2020, at 21:51, Guozhang Wang wrote:
> Hey John,
> 
> Since this is a common question and I've seen many users asking about
> window semantics like this, could you file a JIRA ticket for creating a
> wiki page like Join Semantics (
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
> to summarize the windowing operations like this too?
> 
> Guozhang
> 
> On Sat, Jan 18, 2020 at 3:22 PM John Roesler <vv...@apache.org> wrote:
> 
> > Glad it helped!
> > -John
> >
> > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > > Hi John,
> > >
> > > Thank you for your assistance!
> > > Your example very help me and I understood kafka-streams more clearly
> > now.
> > > Have a nice weekend :)
> > >
> > > Best regards,
> > > Viktor Markvardt
> > >
> > > чт, 16 янв. 2020 г. в 19:29, John Roesler <vv...@apache.org>:
> > >
> > > > Hi Viktor,
> > > >
> > > > I’m starting to wonder what exactly “duplicate” means in this context.
> > Can
> > > > you elaborate?
> > > >
> > > > In case it helps, with your window definition, if I send a record with
> > > > timestamp 20, it would actually belong to three different windows:
> > > > [0,30)
> > > > [10,40)
> > > > [20,50)
> > > >
> > > > Because of this, you would (correctly) see three output records for
> > that
> > > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > > they’d have different keys:
> > > >
> > > > Input:
> > > > Key1: Val1 @ timestamp:20
> > > >
> > > > Output:
> > > > Windowed<Window(0,30),Key1>: 1
> > > > Windowed<Window(10,40),Key1>: 1
> > > > Windowed<Window(20,50),Key1>: 1
> > > >
> > > > Any chance that explains your observation?
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > >
> > > >
> > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > > Hi John,
> > > > >
> > > > > Thanks for answering my questions!
> > > > > I observe behavior which I can not understand.
> > > > > The code is working, but when delay between records larger then
> > window
> > > > > duration I receive duplicated records.
> > > > > With the code below I received duplicated records in the output
> > kstream.
> > > > > Count of duplicate records is always 3. If I change
> > duration/advanceBy
> > > > > count of duplicated records is changing also.
> > > > > Do you have any ideas why duplicated records are received in the
> > output
> > > > > kstream?
> > > > >
> > > > > KStream<String, String> windowedStream = source
> > > > >     .groupByKey()
> > > > >
> > > > >
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > > >     .count()
> > > > >
> > > > >
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > >     .toStream();
> > > > >
> > > > >
> > > > > Best regards,
> > > > > Viktor Markvardt
> > > > >
> > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler <vv...@apache.org>:
> > > > >
> > > > > > Hi Viktor,
> > > > > >
> > > > > > I’m not sure why you get two identical outputs in response to a
> > single
> > > > > > record. Regardless, since you say that you want to get a single,
> > final
> > > > > > result for the window and you expect multiple inputs to the
> > windows,
> > > > you
> > > > > > need Suppression.
> > > > > >
> > > > > > My guess is that you just sent one record to try it out and didn’t
> > see
> > > > any
> > > > > > output? This is expected. Just as the window boundaries are
> > defined by
> > > > the
> > > > > > time stamps of the records, not by the current system time,
> > > > suppression is
> > > > > > governed by the timestamp of the records. I.e., a thirty-second
> > window
> > > > is
> > > > > > not actually closed until you see a new record with a timestamp
> > thirty
> > > > > > seconds later.
> > > > > >
> > > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > > timestamps. If the first record has timestamp T, then you should
> > see an
> > > > > > output when you pass in a record with timestamp T+30.
> > > > > >
> > > > > > Important note: there is a built-in grace period that delays the
> > > > output of
> > > > > > final results after the window ends. For complicated reasons, the
> > > > default
> > > > > > is 24 hours! So you would actually not see an output until you
> > send a
> > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> > set
> > > > the
> > > > > > grace period on TimeWindows to zero for your testing. You can
> > increase
> > > > it
> > > > > > later if you want to tolerate some late-arriving records.
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > My name is Viktor. I'm currently working with Kafka streams and
> > have
> > > > > > > several questions about Kafka and I can not find answers in the
> > > > official
> > > > > > > docs.
> > > > > > >
> > > > > > > 1. Why suppress functionality does not work with Hopping windows?
> > > > How to
> > > > > > > make it work?
> > > > > > >
> > > > > > > Example of the code:
> > > > > > >
> > > > > > > KStream<String, String> finalStream = source
> > > > > > >                 .groupByKey()
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > > > >                 .reduce((aggValue, newValue) -> newValue,
> > > > > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > > >                 .toStream();
> > > > > > >
> > > > > > > finalStream.print(Printed.toSysOut());
> > > > > > > finalStream.to(outputTopic);
> > > > > > >
> > > > > > > After I run the code above - output stream is empty. There were
> > no
> > > > > > > errors/exceptions.
> > > > > > > NOTE: With Tumbling Window the code working as expected.
> > > > > > > Maybe I simply use it incorrectly?
> > > > > > >
> > > > > > > 2. Why with Hopping windows (without suppress) there are
> > duplicates
> > > > in
> > > > > > the
> > > > > > > output stream?
> > > > > > > E.g., I send one record in the input kstream with Hopping window
> > > > > > > (duration=30s, advanceBy=2s) but get two same records
> > (duplicate) in
> > > > the
> > > > > > > output kstream.
> > > > > > > Is that an expected behavior? If so, how can I filter/switch off
> > > > these
> > > > > > > duplicates?
> > > > > > >
> > > > > > > 3. Mainly I'm trying to solve this problem:
> > > > > > > I have kstream with events inside and events can be repeated
> > > > > > (duplicates).
> > > > > > > In the output kstream I would like to receive only unique events
> > for
> > > > the
> > > > > > > last 24 hours (window duration) with 1 hour window overlay
> > (window
> > > > > > > advanceBy).
> > > > > > > Could you recommend me any examples of code or docs please?
> > > > > > > I have already read official docs and examples but it was not
> > enough
> > > > to
> > > > > > get
> > > > > > > full understanding of how I can achieve this.
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Viktor Markvardt
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> 
> 
> -- 
> -- Guozhang
>

Re: Streams, Kafka windows

Posted by Guozhang Wang <wa...@gmail.com>.
Hey John,

Since this is a common question and I've seen many users asking about
window semantics like this, could you file a JIRA ticket for creating a
wiki page like Join Semantics (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
to summarize the windowing operations like this too?

Guozhang

On Sat, Jan 18, 2020 at 3:22 PM John Roesler <vv...@apache.org> wrote:

> Glad it helped!
> -John
>
> On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > Hi John,
> >
> > Thank you for your assistance!
> > Your example very help me and I understood kafka-streams more clearly
> now.
> > Have a nice weekend :)
> >
> > Best regards,
> > Viktor Markvardt
> >
> > чт, 16 янв. 2020 г. в 19:29, John Roesler <vv...@apache.org>:
> >
> > > Hi Viktor,
> > >
> > > I’m starting to wonder what exactly “duplicate” means in this context.
> Can
> > > you elaborate?
> > >
> > > In case it helps, with your window definition, if I send a record with
> > > timestamp 20, it would actually belong to three different windows:
> > > [0,30)
> > > [10,40)
> > > [20,50)
> > >
> > > Because of this, you would (correctly) see three output records for
> that
> > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > they’d have different keys:
> > >
> > > Input:
> > > Key1: Val1 @ timestamp:20
> > >
> > > Output:
> > > Windowed<Window(0,30),Key1>: 1
> > > Windowed<Window(10,40),Key1>: 1
> > > Windowed<Window(20,50),Key1>: 1
> > >
> > > Any chance that explains your observation?
> > >
> > > Thanks,
> > > John
> > >
> > >
> > >
> > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > Hi John,
> > > >
> > > > Thanks for answering my questions!
> > > > I observe behavior which I can not understand.
> > > > The code is working, but when delay between records larger then
> window
> > > > duration I receive duplicated records.
> > > > With the code below I received duplicated records in the output
> kstream.
> > > > Count of duplicate records is always 3. If I change
> duration/advanceBy
> > > > count of duplicated records is changing also.
> > > > Do you have any ideas why duplicated records are received in the
> output
> > > > kstream?
> > > >
> > > > KStream<String, String> windowedStream = source
> > > >     .groupByKey()
> > > >
> > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > >     .count()
> > > >
> > > >
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >     .toStream();
> > > >
> > > >
> > > > Best regards,
> > > > Viktor Markvardt
> > > >
> > > > чт, 16 янв. 2020 г. в 04:59, John Roesler <vv...@apache.org>:
> > > >
> > > > > Hi Viktor,
> > > > >
> > > > > I’m not sure why you get two identical outputs in response to a
> single
> > > > > record. Regardless, since you say that you want to get a single,
> final
> > > > > result for the window and you expect multiple inputs to the
> windows,
> > > you
> > > > > need Suppression.
> > > > >
> > > > > My guess is that you just sent one record to try it out and didn’t
> see
> > > any
> > > > > output? This is expected. Just as the window boundaries are
> defined by
> > > the
> > > > > time stamps of the records, not by the current system time,
> > > suppression is
> > > > > governed by the timestamp of the records. I.e., a thirty-second
> window
> > > is
> > > > > not actually closed until you see a new record with a timestamp
> thirty
> > > > > seconds later.
> > > > >
> > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > timestamps. If the first record has timestamp T, then you should
> see an
> > > > > output when you pass in a record with timestamp T+30.
> > > > >
> > > > > Important note: there is a built-in grace period that delays the
> > > output of
> > > > > final results after the window ends. For complicated reasons, the
> > > default
> > > > > is 24 hours! So you would actually not see an output until you
> send a
> > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> set
> > > the
> > > > > grace period on TimeWindows to zero for your testing. You can
> increase
> > > it
> > > > > later if you want to tolerate some late-arriving records.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > > Hi,
> > > > > >
> > > > > > My name is Viktor. I'm currently working with Kafka streams and
> have
> > > > > > several questions about Kafka and I can not find answers in the
> > > official
> > > > > > docs.
> > > > > >
> > > > > > 1. Why suppress functionality does not work with Hopping windows?
> > > How to
> > > > > > make it work?
> > > > > >
> > > > > > Example of the code:
> > > > > >
> > > > > > KStream<String, String> finalStream = source
> > > > > >                 .groupByKey()
> > > > > >
> > > > > >
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > > >                 .reduce((aggValue, newValue) -> newValue,
> > > > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > > > >
> > > > > >
> > > > >
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > >                 .toStream();
> > > > > >
> > > > > > finalStream.print(Printed.toSysOut());
> > > > > > finalStream.to(outputTopic);
> > > > > >
> > > > > > After I run the code above - output stream is empty. There were
> no
> > > > > > errors/exceptions.
> > > > > > NOTE: With Tumbling Window the code working as expected.
> > > > > > Maybe I simply use it incorrectly?
> > > > > >
> > > > > > 2. Why with Hopping windows (without suppress) there are
> duplicates
> > > in
> > > > > the
> > > > > > output stream?
> > > > > > E.g., I send one record in the input kstream with Hopping window
> > > > > > (duration=30s, advanceBy=2s) but get two same records
> (duplicate) in
> > > the
> > > > > > output kstream.
> > > > > > Is that an expected behavior? If so, how can I filter/switch off
> > > these
> > > > > > duplicates?
> > > > > >
> > > > > > 3. Mainly I'm trying to solve this problem:
> > > > > > I have kstream with events inside and events can be repeated
> > > > > (duplicates).
> > > > > > In the output kstream I would like to receive only unique events
> for
> > > the
> > > > > > last 24 hours (window duration) with 1 hour window overlay
> (window
> > > > > > advanceBy).
> > > > > > Could you recommend me any examples of code or docs please?
> > > > > > I have already read official docs and examples but it was not
> enough
> > > to
> > > > > get
> > > > > > full understanding of how I can achieve this.
> > > > > >
> > > > > > Best regards,
> > > > > > Viktor Markvardt
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 
-- Guozhang

Re: Streams, Kafka windows

Posted by John Roesler <vv...@apache.org>.
Glad it helped!
-John

On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> Hi John,
> 
> Thank you for your assistance!
> Your example very help me and I understood kafka-streams more clearly now.
> Have a nice weekend :)
> 
> Best regards,
> Viktor Markvardt
> 
> чт, 16 янв. 2020 г. в 19:29, John Roesler <vv...@apache.org>:
> 
> > Hi Viktor,
> >
> > I’m starting to wonder what exactly “duplicate” means in this context. Can
> > you elaborate?
> >
> > In case it helps, with your window definition, if I send a record with
> > timestamp 20, it would actually belong to three different windows:
> > [0,30)
> > [10,40)
> > [20,50)
> >
> > Because of this, you would (correctly) see three output records for that
> > one input, but the outputs wouldn’t be “duplicates” properly, because
> > they’d have different keys:
> >
> > Input:
> > Key1: Val1 @ timestamp:20
> >
> > Output:
> > Windowed<Window(0,30),Key1>: 1
> > Windowed<Window(10,40),Key1>: 1
> > Windowed<Window(20,50),Key1>: 1
> >
> > Any chance that explains your observation?
> >
> > Thanks,
> > John
> >
> >
> >
> > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > Hi John,
> > >
> > > Thanks for answering my questions!
> > > I observe behavior which I can not understand.
> > > The code is working, but when delay between records larger then window
> > > duration I receive duplicated records.
> > > With the code below I received duplicated records in the output kstream.
> > > Count of duplicate records is always 3. If I change duration/advanceBy
> > > count of duplicated records is changing also.
> > > Do you have any ideas why duplicated records are received in the output
> > > kstream?
> > >
> > > KStream<String, String> windowedStream = source
> > >     .groupByKey()
> > >
> > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > >     .count()
> > >
> > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > >     .toStream();
> > >
> > >
> > > Best regards,
> > > Viktor Markvardt
> > >
> > > чт, 16 янв. 2020 г. в 04:59, John Roesler <vv...@apache.org>:
> > >
> > > > Hi Viktor,
> > > >
> > > > I’m not sure why you get two identical outputs in response to a single
> > > > record. Regardless, since you say that you want to get a single, final
> > > > result for the window and you expect multiple inputs to the windows,
> > you
> > > > need Suppression.
> > > >
> > > > My guess is that you just sent one record to try it out and didn’t see
> > any
> > > > output? This is expected. Just as the window boundaries are defined by
> > the
> > > > time stamps of the records, not by the current system time,
> > suppression is
> > > > governed by the timestamp of the records. I.e., a thirty-second window
> > is
> > > > not actually closed until you see a new record with a timestamp thirty
> > > > seconds later.
> > > >
> > > >  Maybe try just sending a sequence of updates with incrementing
> > > > timestamps. If the first record has timestamp T, then you should see an
> > > > output when you pass in a record with timestamp T+30.
> > > >
> > > > Important note: there is a built-in grace period that delays the
> > output of
> > > > final results after the window ends. For complicated reasons, the
> > default
> > > > is 24 hours! So you would actually not see an output until you send a
> > > > record with timestamp T+30+(24 hours) ! I strongly recommend you set
> > the
> > > > grace period on TimeWindows to zero for your testing. You can increase
> > it
> > > > later if you want to tolerate some late-arriving records.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > Hi,
> > > > >
> > > > > My name is Viktor. I'm currently working with Kafka streams and have
> > > > > several questions about Kafka and I can not find answers in the
> > official
> > > > > docs.
> > > > >
> > > > > 1. Why suppress functionality does not work with Hopping windows?
> > How to
> > > > > make it work?
> > > > >
> > > > > Example of the code:
> > > > >
> > > > > KStream<String, String> finalStream = source
> > > > >                 .groupByKey()
> > > > >
> > > > >
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > >                 .reduce((aggValue, newValue) -> newValue,
> > > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > > >
> > > > >
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > >                 .toStream();
> > > > >
> > > > > finalStream.print(Printed.toSysOut());
> > > > > finalStream.to(outputTopic);
> > > > >
> > > > > After I run the code above - output stream is empty. There were no
> > > > > errors/exceptions.
> > > > > NOTE: With Tumbling Window the code working as expected.
> > > > > Maybe I simply use it incorrectly?
> > > > >
> > > > > 2. Why with Hopping windows (without suppress) there are duplicates
> > in
> > > > the
> > > > > output stream?
> > > > > E.g., I send one record in the input kstream with Hopping window
> > > > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in
> > the
> > > > > output kstream.
> > > > > Is that an expected behavior? If so, how can I filter/switch off
> > these
> > > > > duplicates?
> > > > >
> > > > > 3. Mainly I'm trying to solve this problem:
> > > > > I have kstream with events inside and events can be repeated
> > > > (duplicates).
> > > > > In the output kstream I would like to receive only unique events for
> > the
> > > > > last 24 hours (window duration) with 1 hour window overlay (window
> > > > > advanceBy).
> > > > > Could you recommend me any examples of code or docs please?
> > > > > I have already read official docs and examples but it was not enough
> > to
> > > > get
> > > > > full understanding of how I can achieve this.
> > > > >
> > > > > Best regards,
> > > > > Viktor Markvardt
> > > > >
> > > >
> > >
> >
>

Re: Streams, Kafka windows

Posted by Viktor Markvardt <vi...@gmail.com>.
Hi John,

Thank you for your assistance!
Your example very help me and I understood kafka-streams more clearly now.
Have a nice weekend :)

Best regards,
Viktor Markvardt

чт, 16 янв. 2020 г. в 19:29, John Roesler <vv...@apache.org>:

> Hi Viktor,
>
> I’m starting to wonder what exactly “duplicate” means in this context. Can
> you elaborate?
>
> In case it helps, with your window definition, if I send a record with
> timestamp 20, it would actually belong to three different windows:
> [0,30)
> [10,40)
> [20,50)
>
> Because of this, you would (correctly) see three output records for that
> one input, but the outputs wouldn’t be “duplicates” properly, because
> they’d have different keys:
>
> Input:
> Key1: Val1 @ timestamp:20
>
> Output:
> Windowed<Window(0,30),Key1>: 1
> Windowed<Window(10,40),Key1>: 1
> Windowed<Window(20,50),Key1>: 1
>
> Any chance that explains your observation?
>
> Thanks,
> John
>
>
>
> On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > Hi John,
> >
> > Thanks for answering my questions!
> > I observe behavior which I can not understand.
> > The code is working, but when delay between records larger then window
> > duration I receive duplicated records.
> > With the code below I received duplicated records in the output kstream.
> > Count of duplicate records is always 3. If I change duration/advanceBy
> > count of duplicated records is changing also.
> > Do you have any ideas why duplicated records are received in the output
> > kstream?
> >
> > KStream<String, String> windowedStream = source
> >     .groupByKey()
> >
> >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> >     .count()
> >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >     .toStream();
> >
> >
> > Best regards,
> > Viktor Markvardt
> >
> > чт, 16 янв. 2020 г. в 04:59, John Roesler <vv...@apache.org>:
> >
> > > Hi Viktor,
> > >
> > > I’m not sure why you get two identical outputs in response to a single
> > > record. Regardless, since you say that you want to get a single, final
> > > result for the window and you expect multiple inputs to the windows,
> you
> > > need Suppression.
> > >
> > > My guess is that you just sent one record to try it out and didn’t see
> any
> > > output? This is expected. Just as the window boundaries are defined by
> the
> > > time stamps of the records, not by the current system time,
> suppression is
> > > governed by the timestamp of the records. I.e., a thirty-second window
> is
> > > not actually closed until you see a new record with a timestamp thirty
> > > seconds later.
> > >
> > >  Maybe try just sending a sequence of updates with incrementing
> > > timestamps. If the first record has timestamp T, then you should see an
> > > output when you pass in a record with timestamp T+30.
> > >
> > > Important note: there is a built-in grace period that delays the
> output of
> > > final results after the window ends. For complicated reasons, the
> default
> > > is 24 hours! So you would actually not see an output until you send a
> > > record with timestamp T+30+(24 hours) ! I strongly recommend you set
> the
> > > grace period on TimeWindows to zero for your testing. You can increase
> it
> > > later if you want to tolerate some late-arriving records.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > Hi,
> > > >
> > > > My name is Viktor. I'm currently working with Kafka streams and have
> > > > several questions about Kafka and I can not find answers in the
> official
> > > > docs.
> > > >
> > > > 1. Why suppress functionality does not work with Hopping windows?
> How to
> > > > make it work?
> > > >
> > > > Example of the code:
> > > >
> > > > KStream<String, String> finalStream = source
> > > >                 .groupByKey()
> > > >
> > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > >                 .reduce((aggValue, newValue) -> newValue,
> > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > >
> > > >
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >                 .toStream();
> > > >
> > > > finalStream.print(Printed.toSysOut());
> > > > finalStream.to(outputTopic);
> > > >
> > > > After I run the code above - output stream is empty. There were no
> > > > errors/exceptions.
> > > > NOTE: With Tumbling Window the code working as expected.
> > > > Maybe I simply use it incorrectly?
> > > >
> > > > 2. Why with Hopping windows (without suppress) there are duplicates
> in
> > > the
> > > > output stream?
> > > > E.g., I send one record in the input kstream with Hopping window
> > > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in
> the
> > > > output kstream.
> > > > Is that an expected behavior? If so, how can I filter/switch off
> these
> > > > duplicates?
> > > >
> > > > 3. Mainly I'm trying to solve this problem:
> > > > I have kstream with events inside and events can be repeated
> > > (duplicates).
> > > > In the output kstream I would like to receive only unique events for
> the
> > > > last 24 hours (window duration) with 1 hour window overlay (window
> > > > advanceBy).
> > > > Could you recommend me any examples of code or docs please?
> > > > I have already read official docs and examples but it was not enough
> to
> > > get
> > > > full understanding of how I can achieve this.
> > > >
> > > > Best regards,
> > > > Viktor Markvardt
> > > >
> > >
> >
>

Re: Streams, Kafka windows

Posted by John Roesler <vv...@apache.org>.
Hi Viktor,

I’m starting to wonder what exactly “duplicate” means in this context. Can you elaborate?

In case it helps, with your window definition, if I send a record with timestamp 20, it would actually belong to three different windows:
[0,30)
[10,40)
[20,50)

Because of this, you would (correctly) see three output records for that one input, but the outputs wouldn’t be “duplicates” properly, because they’d have different keys:

Input:
Key1: Val1 @ timestamp:20

Output:
Windowed<Window(0,30),Key1>: 1
Windowed<Window(10,40),Key1>: 1
Windowed<Window(20,50),Key1>: 1

Any chance that explains your observation?

Thanks,
John



On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> Hi John,
> 
> Thanks for answering my questions!
> I observe behavior which I can not understand.
> The code is working, but when delay between records larger then window
> duration I receive duplicated records.
> With the code below I received duplicated records in the output kstream.
> Count of duplicate records is always 3. If I change duration/advanceBy
> count of duplicated records is changing also.
> Do you have any ideas why duplicated records are received in the output
> kstream?
> 
> KStream<String, String> windowedStream = source
>     .groupByKey()
>     
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
>     .count()
>     
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>     .toStream();
> 
> 
> Best regards,
> Viktor Markvardt
> 
> чт, 16 янв. 2020 г. в 04:59, John Roesler <vv...@apache.org>:
> 
> > Hi Viktor,
> >
> > I’m not sure why you get two identical outputs in response to a single
> > record. Regardless, since you say that you want to get a single, final
> > result for the window and you expect multiple inputs to the windows, you
> > need Suppression.
> >
> > My guess is that you just sent one record to try it out and didn’t see any
> > output? This is expected. Just as the window boundaries are defined by the
> > time stamps of the records, not by the current system time, suppression is
> > governed by the timestamp of the records. I.e., a thirty-second window is
> > not actually closed until you see a new record with a timestamp thirty
> > seconds later.
> >
> >  Maybe try just sending a sequence of updates with incrementing
> > timestamps. If the first record has timestamp T, then you should see an
> > output when you pass in a record with timestamp T+30.
> >
> > Important note: there is a built-in grace period that delays the output of
> > final results after the window ends. For complicated reasons, the default
> > is 24 hours! So you would actually not see an output until you send a
> > record with timestamp T+30+(24 hours) ! I strongly recommend you set the
> > grace period on TimeWindows to zero for your testing. You can increase it
> > later if you want to tolerate some late-arriving records.
> >
> > Thanks,
> > -John
> >
> > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > Hi,
> > >
> > > My name is Viktor. I'm currently working with Kafka streams and have
> > > several questions about Kafka and I can not find answers in the official
> > > docs.
> > >
> > > 1. Why suppress functionality does not work with Hopping windows? How to
> > > make it work?
> > >
> > > Example of the code:
> > >
> > > KStream<String, String> finalStream = source
> > >                 .groupByKey()
> > >
> > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > >                 .reduce((aggValue, newValue) -> newValue,
> > > Materialized.with(Serdes.String(), Serdes.String()))
> > >
> > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > >                 .toStream();
> > >
> > > finalStream.print(Printed.toSysOut());
> > > finalStream.to(outputTopic);
> > >
> > > After I run the code above - output stream is empty. There were no
> > > errors/exceptions.
> > > NOTE: With Tumbling Window the code working as expected.
> > > Maybe I simply use it incorrectly?
> > >
> > > 2. Why with Hopping windows (without suppress) there are duplicates in
> > the
> > > output stream?
> > > E.g., I send one record in the input kstream with Hopping window
> > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> > > output kstream.
> > > Is that an expected behavior? If so, how can I filter/switch off these
> > > duplicates?
> > >
> > > 3. Mainly I'm trying to solve this problem:
> > > I have kstream with events inside and events can be repeated
> > (duplicates).
> > > In the output kstream I would like to receive only unique events for the
> > > last 24 hours (window duration) with 1 hour window overlay (window
> > > advanceBy).
> > > Could you recommend me any examples of code or docs please?
> > > I have already read official docs and examples but it was not enough to
> > get
> > > full understanding of how I can achieve this.
> > >
> > > Best regards,
> > > Viktor Markvardt
> > >
> >
>

Re: Streams, Kafka windows

Posted by Viktor Markvardt <vi...@gmail.com>.
Hi John,

Thanks for answering my questions!
I observe behavior which I can not understand.
The code is working, but when delay between records larger then window
duration I receive duplicated records.
With the code below I received duplicated records in the output kstream.
Count of duplicate records is always 3. If I change duration/advanceBy
count of duplicated records is changing also.
Do you have any ideas why duplicated records are received in the output
kstream?

KStream<String, String> windowedStream = source
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
    .count()
    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
    .toStream();


Best regards,
Viktor Markvardt

чт, 16 янв. 2020 г. в 04:59, John Roesler <vv...@apache.org>:

> Hi Viktor,
>
> I’m not sure why you get two identical outputs in response to a single
> record. Regardless, since you say that you want to get a single, final
> result for the window and you expect multiple inputs to the windows, you
> need Suppression.
>
> My guess is that you just sent one record to try it out and didn’t see any
> output? This is expected. Just as the window boundaries are defined by the
> time stamps of the records, not by the current system time, suppression is
> governed by the timestamp of the records. I.e., a thirty-second window is
> not actually closed until you see a new record with a timestamp thirty
> seconds later.
>
>  Maybe try just sending a sequence of updates with incrementing
> timestamps. If the first record has timestamp T, then you should see an
> output when you pass in a record with timestamp T+30.
>
> Important note: there is a built-in grace period that delays the output of
> final results after the window ends. For complicated reasons, the default
> is 24 hours! So you would actually not see an output until you send a
> record with timestamp T+30+(24 hours) ! I strongly recommend you set the
> grace period on TimeWindows to zero for your testing. You can increase it
> later if you want to tolerate some late-arriving records.
>
> Thanks,
> -John
>
> On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > Hi,
> >
> > My name is Viktor. I'm currently working with Kafka streams and have
> > several questions about Kafka and I can not find answers in the official
> > docs.
> >
> > 1. Why suppress functionality does not work with Hopping windows? How to
> > make it work?
> >
> > Example of the code:
> >
> > KStream<String, String> finalStream = source
> >                 .groupByKey()
> >
> >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> >                 .reduce((aggValue, newValue) -> newValue,
> > Materialized.with(Serdes.String(), Serdes.String()))
> >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >                 .toStream();
> >
> > finalStream.print(Printed.toSysOut());
> > finalStream.to(outputTopic);
> >
> > After I run the code above - output stream is empty. There were no
> > errors/exceptions.
> > NOTE: With Tumbling Window the code working as expected.
> > Maybe I simply use it incorrectly?
> >
> > 2. Why with Hopping windows (without suppress) there are duplicates in
> the
> > output stream?
> > E.g., I send one record in the input kstream with Hopping window
> > (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> > output kstream.
> > Is that an expected behavior? If so, how can I filter/switch off these
> > duplicates?
> >
> > 3. Mainly I'm trying to solve this problem:
> > I have kstream with events inside and events can be repeated
> (duplicates).
> > In the output kstream I would like to receive only unique events for the
> > last 24 hours (window duration) with 1 hour window overlay (window
> > advanceBy).
> > Could you recommend me any examples of code or docs please?
> > I have already read official docs and examples but it was not enough to
> get
> > full understanding of how I can achieve this.
> >
> > Best regards,
> > Viktor Markvardt
> >
>

Re: Streams, Kafka windows

Posted by John Roesler <vv...@apache.org>.
Hi Viktor,

I’m not sure why you get two identical outputs in response to a single record. Regardless, since you say that you want to get a single, final result for the window and you expect multiple inputs to the windows, you need Suppression.

My guess is that you just sent one record to try it out and didn’t see any output? This is expected. Just as the window boundaries are defined by the time stamps of the records, not by the current system time, suppression is governed by the timestamp of the records. I.e., a thirty-second window is not actually closed until you see a new record with a timestamp thirty seconds later.

 Maybe try just sending a sequence of updates with incrementing timestamps. If the first record has timestamp T, then you should see an output when you pass in a record with timestamp T+30. 

Important note: there is a built-in grace period that delays the output of final results after the window ends. For complicated reasons, the default is 24 hours! So you would actually not see an output until you send a record with timestamp T+30+(24 hours) ! I strongly recommend you set the grace period on TimeWindows to zero for your testing. You can increase it later if you want to tolerate some late-arriving records. 

Thanks,
-John

On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> Hi,
> 
> My name is Viktor. I'm currently working with Kafka streams and have
> several questions about Kafka and I can not find answers in the official
> docs.
> 
> 1. Why suppress functionality does not work with Hopping windows? How to
> make it work?
> 
> Example of the code:
> 
> KStream<String, String> finalStream = source
>                 .groupByKey()
> 
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
>                 .reduce((aggValue, newValue) -> newValue,
> Materialized.with(Serdes.String(), Serdes.String()))
> 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>                 .toStream();
> 
> finalStream.print(Printed.toSysOut());
> finalStream.to(outputTopic);
> 
> After I run the code above - output stream is empty. There were no
> errors/exceptions.
> NOTE: With Tumbling Window the code working as expected.
> Maybe I simply use it incorrectly?
> 
> 2. Why with Hopping windows (without suppress) there are duplicates in the
> output stream?
> E.g., I send one record in the input kstream with Hopping window
> (duration=30s, advanceBy=2s) but get two same records (duplicate) in the
> output kstream.
> Is that an expected behavior? If so, how can I filter/switch off these
> duplicates?
> 
> 3. Mainly I'm trying to solve this problem:
> I have kstream with events inside and events can be repeated (duplicates).
> In the output kstream I would like to receive only unique events for the
> last 24 hours (window duration) with 1 hour window overlay (window
> advanceBy).
> Could you recommend me any examples of code or docs please?
> I have already read official docs and examples but it was not enough to get
> full understanding of how I can achieve this.
> 
> Best regards,
> Viktor Markvardt
>