You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2019/01/02 14:27:40 UTC

Re: Whey does the window final result is not emitted after the window has elapsed?

> After some time, the window closes.

This is not correct. Windows are based on event-time, and because no new
input record is processed, the window is not closed. That is the reason
why you don't get any output. Only a new input record can advance
"stream time" and close the window.

In practice, when data flows continuously, this should not be a issue
though.


-Matthias

On 12/31/18 8:22 AM, jingguo yao wrote:
> Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does
> the window final result is not emitted after the window has elapsed?"
> 
> I have browsed the Kafka source code and found the cause of the
> mentioned behaviour.
> 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> has the following code:
> 
> @Override
> public void process(final K key, final Change<V> value) {
>   buffer(key, value);
>   enforceConstraints();
> }
> 
> enforceConstraints method invocation emits window results under some
> conditions in the above code.
> 
> After process method processes the first record, the window begins.
> After some time, the window closes. But before process is invoked
> again (triggered by receiving another record), there is no chance to
> emit the window result.
> 
> Are there some configuration options to emit the window result without
> waiting for another record to arrive?
> 
> And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> 5.1.0.
> 
> jingguo yao <ya...@gmail.com> 于2018年12月30日周日 下午10:53写道:
>>
>> I followed [1] to code a simple example to try suppress operator.
>>
>> Here is the simple code:
>>
>> final Serde<String> stringSerde = Serdes.String();
>> final StreamsBuilder builder = new StreamsBuilder();
>> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
>> Serdes.String()))
>>   .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
>>   .groupBy((key, word) -> word,
>> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
>>   .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
>>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
>>   .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>>   .toStream()
>>   .foreach(
>>       (key, value) -> {
>>         System.out.printf("key: %s, value: %d\n", key, value);
>>       });
>>
>> I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I
>> send one text line "hello", nothing will be printed even I wait for
>> more than 3 seconds (the window size). Since the time longer than the
>> window size has elapsed, I think that key and value should be printed.
>>
>> But if I send another text line "hello", key and value will be
>> printed.
>>
>> Can anyone explain this behavior? I have browsed the Kafka
>> documentation. But I can't find an explanation.
>>
>> [1] http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
>>
>>
>> --
>> Jingguo
> 
> 
> 


Re: Whey does the window final result is not emitted after the window has elapsed?

Posted by jingguo yao <ya...@gmail.com>.
Guozhang:

Thanks for your kind help.

Guozhang Wang <wa...@gmail.com> 于2019年1月5日周六 上午3:28写道:
>
> Thanks for the detailed description.
>
> 1) Yes the stream time is advancable by any records.
> 2) Given your description, another way to work around the situation is to
> let your class send a final record with timestamp set as the class-end-time
> plus a small delta (think of it as a sentinel "tick" record just for
> advancing the clock), and therefore the stream time can be advanced still
> by the end of the last class.
>
> Guozhang
>
>
> On Thu, Jan 3, 2019 at 11:43 PM jingguo yao <ya...@gmail.com> wrote:
>
> > Guozhang:
> >
> > Yes, my case is a  real production scenario.
> >
> > I am monitoring on-line live-broadcast classes. I need to have a
> > summary of each 5-minute period for one class. Each class has a
> > classroom Id. I report class activity data to a Kafka topic. Classroom
> > id is used to partition these data. Here are my Kafka streaming steps:
> >
> > 1. groupByKey: with classroom id.
> > 2. windowedBy: with a 5-minute tumbling window.
> > 3. aggregate: do a summary over the window.
> > 4. Send the aggregate result to other external systems.
> >
> > Each class has about one-hour length. After the class ends, no data
> > will be sent for the class. At each night, there will be dozens of
> > classes.
> >
> > I have done more tests with my code. It seems that a new record to
> > advance the window for class A does not need to be a new record for
> > class A. A new record for any class can advance the window. Is this
> > behavior guaranteed by Kafka streams? Even if this behavior is
> > guaranteed, the last window for the last class at one night can't be
> > delivered until a new record arrives for a new class tomorrow night.
> >
> >
> > Guozhang Wang <wa...@gmail.com> 于2019年1月4日周五 上午2:59写道:
> >
> > >
> > > Hello Jingguo,
> > >
> > > Is this case (i.e. you only have data over 57 minutes, and no new data
> > > afterwards) a real production scenario? In stream processing we usually
> > > expect the input data stream in continuously, and I'm curious to learn
> > your
> > > use case better and why it would not have further data after a period of
> > > time.
> > >
> > > ATM, if you want to really walk around this issue you can use system-time
> > > based `punctuate` call, which is a lower-level functionality than the
> > > `suppress` call in DSL.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Jan 3, 2019 at 6:50 AM jingguo yao <ya...@gmail.com> wrote:
> > >
> > > > Hi, Matthias
> > > >
> > > > I am doing a 5-minute tumbling window analysis over a 57-minute data
> > > > flow. And I want only one final result per window. So I need suppress.
> > > > The 57-minute period can be divided into about 12 windows. The results
> > > > of the first 11 windows can be delivered downstream. But the final
> > > > result for the last 2-minute window can never be delivered downstream
> > > > since there is no a new record to advance the window.
> > > >
> > > > Is there any workaround to deliver the result for the last window in
> > > > my situation?
> > > >
> > > > -- Jingguo
> > > >
> > > > Matthias J. Sax <ma...@confluent.io> 于2019年1月2日周三 下午10:27写道:
> > > > >
> > > > > > After some time, the window closes.
> > > > >
> > > > > This is not correct. Windows are based on event-time, and because no
> > new
> > > > > input record is processed, the window is not closed. That is the
> > reason
> > > > > why you don't get any output. Only a new input record can advance
> > > > > "stream time" and close the window.
> > > > >
> > > > > In practice, when data flows continuously, this should not be a issue
> > > > > though.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 12/31/18 8:22 AM, jingguo yao wrote:
> > > > > > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does
> > > > > > the window final result is not emitted after the window has
> > elapsed?"
> > > > > >
> > > > > > I have browsed the Kafka source code and found the cause of the
> > > > > > mentioned behaviour.
> > > > > >
> > > > > >
> > > >
> > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> > > > > > has the following code:
> > > > > >
> > > > > > @Override
> > > > > > public void process(final K key, final Change<V> value) {
> > > > > >   buffer(key, value);
> > > > > >   enforceConstraints();
> > > > > > }
> > > > > >
> > > > > > enforceConstraints method invocation emits window results under
> > some
> > > > > > conditions in the above code.
> > > > > >
> > > > > > After process method processes the first record, the window begins.
> > > > > > After some time, the window closes. But before process is invoked
> > > > > > again (triggered by receiving another record), there is no chance
> > to
> > > > > > emit the window result.
> > > > > >
> > > > > > Are there some configuration options to emit the window result
> > without
> > > > > > waiting for another record to arrive?
> > > > > >
> > > > > > And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> > > > > > 5.1.0.
> > > > > >
> > > > > > jingguo yao <ya...@gmail.com> 于2018年12月30日周日 下午10:53写道:
> > > > > >>
> > > > > >> I followed [1] to code a simple example to try suppress operator.
> > > > > >>
> > > > > >> Here is the simple code:
> > > > > >>
> > > > > >> final Serde<String> stringSerde = Serdes.String();
> > > > > >> final StreamsBuilder builder = new StreamsBuilder();
> > > > > >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
> > > > > >> Serdes.String()))
> > > > > >>   .flatMapValues(value ->
> > > > Arrays.asList(value.toLowerCase().split("\\W+")))
> > > > > >>   .groupBy((key, word) -> word,
> > > > > >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
> > > > > >>
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
> > > > > >>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
> > > > > >>
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > >>   .toStream()
> > > > > >>   .foreach(
> > > > > >>       (key, value) -> {
> > > > > >>         System.out.printf("key: %s, value: %d\n", key, value);
> > > > > >>       });
> > > > > >>
> > > > > >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to
> > 0. If
> > > > I
> > > > > >> send one text line "hello", nothing will be printed even I wait
> > for
> > > > > >> more than 3 seconds (the window size). Since the time longer than
> > the
> > > > > >> window size has elapsed, I think that key and value should be
> > printed.
> > > > > >>
> > > > > >> But if I send another text line "hello", key and value will be
> > > > > >> printed.
> > > > > >>
> > > > > >> Can anyone explain this behavior? I have browsed the Kafka
> > > > > >> documentation. But I can't find an explanation.
> > > > > >>
> > > > > >> [1]
> > > >
> > http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Jingguo
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Jingguo
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
> >
> >
> > --
> > Jingguo
> >
>
>
> --
> -- Guozhang



-- 
Jingguo

Re: Whey does the window final result is not emitted after the window has elapsed?

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for the detailed description.

1) Yes the stream time is advancable by any records.
2) Given your description, another way to work around the situation is to
let your class send a final record with timestamp set as the class-end-time
plus a small delta (think of it as a sentinel "tick" record just for
advancing the clock), and therefore the stream time can be advanced still
by the end of the last class.

Guozhang


On Thu, Jan 3, 2019 at 11:43 PM jingguo yao <ya...@gmail.com> wrote:

> Guozhang:
>
> Yes, my case is a  real production scenario.
>
> I am monitoring on-line live-broadcast classes. I need to have a
> summary of each 5-minute period for one class. Each class has a
> classroom Id. I report class activity data to a Kafka topic. Classroom
> id is used to partition these data. Here are my Kafka streaming steps:
>
> 1. groupByKey: with classroom id.
> 2. windowedBy: with a 5-minute tumbling window.
> 3. aggregate: do a summary over the window.
> 4. Send the aggregate result to other external systems.
>
> Each class has about one-hour length. After the class ends, no data
> will be sent for the class. At each night, there will be dozens of
> classes.
>
> I have done more tests with my code. It seems that a new record to
> advance the window for class A does not need to be a new record for
> class A. A new record for any class can advance the window. Is this
> behavior guaranteed by Kafka streams? Even if this behavior is
> guaranteed, the last window for the last class at one night can't be
> delivered until a new record arrives for a new class tomorrow night.
>
>
> Guozhang Wang <wa...@gmail.com> 于2019年1月4日周五 上午2:59写道:
>
> >
> > Hello Jingguo,
> >
> > Is this case (i.e. you only have data over 57 minutes, and no new data
> > afterwards) a real production scenario? In stream processing we usually
> > expect the input data stream in continuously, and I'm curious to learn
> your
> > use case better and why it would not have further data after a period of
> > time.
> >
> > ATM, if you want to really walk around this issue you can use system-time
> > based `punctuate` call, which is a lower-level functionality than the
> > `suppress` call in DSL.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Jan 3, 2019 at 6:50 AM jingguo yao <ya...@gmail.com> wrote:
> >
> > > Hi, Matthias
> > >
> > > I am doing a 5-minute tumbling window analysis over a 57-minute data
> > > flow. And I want only one final result per window. So I need suppress.
> > > The 57-minute period can be divided into about 12 windows. The results
> > > of the first 11 windows can be delivered downstream. But the final
> > > result for the last 2-minute window can never be delivered downstream
> > > since there is no a new record to advance the window.
> > >
> > > Is there any workaround to deliver the result for the last window in
> > > my situation?
> > >
> > > -- Jingguo
> > >
> > > Matthias J. Sax <ma...@confluent.io> 于2019年1月2日周三 下午10:27写道:
> > > >
> > > > > After some time, the window closes.
> > > >
> > > > This is not correct. Windows are based on event-time, and because no
> new
> > > > input record is processed, the window is not closed. That is the
> reason
> > > > why you don't get any output. Only a new input record can advance
> > > > "stream time" and close the window.
> > > >
> > > > In practice, when data flows continuously, this should not be a issue
> > > > though.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 12/31/18 8:22 AM, jingguo yao wrote:
> > > > > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does
> > > > > the window final result is not emitted after the window has
> elapsed?"
> > > > >
> > > > > I have browsed the Kafka source code and found the cause of the
> > > > > mentioned behaviour.
> > > > >
> > > > >
> > >
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> > > > > has the following code:
> > > > >
> > > > > @Override
> > > > > public void process(final K key, final Change<V> value) {
> > > > >   buffer(key, value);
> > > > >   enforceConstraints();
> > > > > }
> > > > >
> > > > > enforceConstraints method invocation emits window results under
> some
> > > > > conditions in the above code.
> > > > >
> > > > > After process method processes the first record, the window begins.
> > > > > After some time, the window closes. But before process is invoked
> > > > > again (triggered by receiving another record), there is no chance
> to
> > > > > emit the window result.
> > > > >
> > > > > Are there some configuration options to emit the window result
> without
> > > > > waiting for another record to arrive?
> > > > >
> > > > > And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> > > > > 5.1.0.
> > > > >
> > > > > jingguo yao <ya...@gmail.com> 于2018年12月30日周日 下午10:53写道:
> > > > >>
> > > > >> I followed [1] to code a simple example to try suppress operator.
> > > > >>
> > > > >> Here is the simple code:
> > > > >>
> > > > >> final Serde<String> stringSerde = Serdes.String();
> > > > >> final StreamsBuilder builder = new StreamsBuilder();
> > > > >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
> > > > >> Serdes.String()))
> > > > >>   .flatMapValues(value ->
> > > Arrays.asList(value.toLowerCase().split("\\W+")))
> > > > >>   .groupBy((key, word) -> word,
> > > > >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
> > > > >>
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
> > > > >>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
> > > > >>
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > >>   .toStream()
> > > > >>   .foreach(
> > > > >>       (key, value) -> {
> > > > >>         System.out.printf("key: %s, value: %d\n", key, value);
> > > > >>       });
> > > > >>
> > > > >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to
> 0. If
> > > I
> > > > >> send one text line "hello", nothing will be printed even I wait
> for
> > > > >> more than 3 seconds (the window size). Since the time longer than
> the
> > > > >> window size has elapsed, I think that key and value should be
> printed.
> > > > >>
> > > > >> But if I send another text line "hello", key and value will be
> > > > >> printed.
> > > > >>
> > > > >> Can anyone explain this behavior? I have browsed the Kafka
> > > > >> documentation. But I can't find an explanation.
> > > > >>
> > > > >> [1]
> > >
> http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Jingguo
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Jingguo
> > >
> >
> >
> > --
> > -- Guozhang
>
>
>
> --
> Jingguo
>


-- 
-- Guozhang

Re: Whey does the window final result is not emitted after the window has elapsed?

Posted by jingguo yao <ya...@gmail.com>.
Guozhang:

Yes, my case is a  real production scenario.

I am monitoring on-line live-broadcast classes. I need to have a
summary of each 5-minute period for one class. Each class has a
classroom Id. I report class activity data to a Kafka topic. Classroom
id is used to partition these data. Here are my Kafka streaming steps:

1. groupByKey: with classroom id.
2. windowedBy: with a 5-minute tumbling window.
3. aggregate: do a summary over the window.
4. Send the aggregate result to other external systems.

Each class has about one-hour length. After the class ends, no data
will be sent for the class. At each night, there will be dozens of
classes.

I have done more tests with my code. It seems that a new record to
advance the window for class A does not need to be a new record for
class A. A new record for any class can advance the window. Is this
behavior guaranteed by Kafka streams? Even if this behavior is
guaranteed, the last window for the last class at one night can't be
delivered until a new record arrives for a new class tomorrow night.


Guozhang Wang <wa...@gmail.com> 于2019年1月4日周五 上午2:59写道:

>
> Hello Jingguo,
>
> Is this case (i.e. you only have data over 57 minutes, and no new data
> afterwards) a real production scenario? In stream processing we usually
> expect the input data stream in continuously, and I'm curious to learn your
> use case better and why it would not have further data after a period of
> time.
>
> ATM, if you want to really walk around this issue you can use system-time
> based `punctuate` call, which is a lower-level functionality than the
> `suppress` call in DSL.
>
>
> Guozhang
>
>
> On Thu, Jan 3, 2019 at 6:50 AM jingguo yao <ya...@gmail.com> wrote:
>
> > Hi, Matthias
> >
> > I am doing a 5-minute tumbling window analysis over a 57-minute data
> > flow. And I want only one final result per window. So I need suppress.
> > The 57-minute period can be divided into about 12 windows. The results
> > of the first 11 windows can be delivered downstream. But the final
> > result for the last 2-minute window can never be delivered downstream
> > since there is no a new record to advance the window.
> >
> > Is there any workaround to deliver the result for the last window in
> > my situation?
> >
> > -- Jingguo
> >
> > Matthias J. Sax <ma...@confluent.io> 于2019年1月2日周三 下午10:27写道:
> > >
> > > > After some time, the window closes.
> > >
> > > This is not correct. Windows are based on event-time, and because no new
> > > input record is processed, the window is not closed. That is the reason
> > > why you don't get any output. Only a new input record can advance
> > > "stream time" and close the window.
> > >
> > > In practice, when data flows continuously, this should not be a issue
> > > though.
> > >
> > >
> > > -Matthias
> > >
> > > On 12/31/18 8:22 AM, jingguo yao wrote:
> > > > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does
> > > > the window final result is not emitted after the window has elapsed?"
> > > >
> > > > I have browsed the Kafka source code and found the cause of the
> > > > mentioned behaviour.
> > > >
> > > >
> > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> > > > has the following code:
> > > >
> > > > @Override
> > > > public void process(final K key, final Change<V> value) {
> > > >   buffer(key, value);
> > > >   enforceConstraints();
> > > > }
> > > >
> > > > enforceConstraints method invocation emits window results under some
> > > > conditions in the above code.
> > > >
> > > > After process method processes the first record, the window begins.
> > > > After some time, the window closes. But before process is invoked
> > > > again (triggered by receiving another record), there is no chance to
> > > > emit the window result.
> > > >
> > > > Are there some configuration options to emit the window result without
> > > > waiting for another record to arrive?
> > > >
> > > > And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> > > > 5.1.0.
> > > >
> > > > jingguo yao <ya...@gmail.com> 于2018年12月30日周日 下午10:53写道:
> > > >>
> > > >> I followed [1] to code a simple example to try suppress operator.
> > > >>
> > > >> Here is the simple code:
> > > >>
> > > >> final Serde<String> stringSerde = Serdes.String();
> > > >> final StreamsBuilder builder = new StreamsBuilder();
> > > >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
> > > >> Serdes.String()))
> > > >>   .flatMapValues(value ->
> > Arrays.asList(value.toLowerCase().split("\\W+")))
> > > >>   .groupBy((key, word) -> word,
> > > >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
> > > >>
> >  .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
> > > >>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
> > > >>
> >  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > >>   .toStream()
> > > >>   .foreach(
> > > >>       (key, value) -> {
> > > >>         System.out.printf("key: %s, value: %d\n", key, value);
> > > >>       });
> > > >>
> > > >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If
> > I
> > > >> send one text line "hello", nothing will be printed even I wait for
> > > >> more than 3 seconds (the window size). Since the time longer than the
> > > >> window size has elapsed, I think that key and value should be printed.
> > > >>
> > > >> But if I send another text line "hello", key and value will be
> > > >> printed.
> > > >>
> > > >> Can anyone explain this behavior? I have browsed the Kafka
> > > >> documentation. But I can't find an explanation.
> > > >>
> > > >> [1]
> > http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
> > > >>
> > > >>
> > > >> --
> > > >> Jingguo
> > > >
> > > >
> > > >
> > >
> >
> >
> > --
> > Jingguo
> >
>
>
> --
> -- Guozhang



--
Jingguo

Re: Whey does the window final result is not emitted after the window has elapsed?

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Jingguo,

Is this case (i.e. you only have data over 57 minutes, and no new data
afterwards) a real production scenario? In stream processing we usually
expect the input data stream in continuously, and I'm curious to learn your
use case better and why it would not have further data after a period of
time.

ATM, if you want to really walk around this issue you can use system-time
based `punctuate` call, which is a lower-level functionality than the
`suppress` call in DSL.


Guozhang


On Thu, Jan 3, 2019 at 6:50 AM jingguo yao <ya...@gmail.com> wrote:

> Hi, Matthias
>
> I am doing a 5-minute tumbling window analysis over a 57-minute data
> flow. And I want only one final result per window. So I need suppress.
> The 57-minute period can be divided into about 12 windows. The results
> of the first 11 windows can be delivered downstream. But the final
> result for the last 2-minute window can never be delivered downstream
> since there is no a new record to advance the window.
>
> Is there any workaround to deliver the result for the last window in
> my situation?
>
> -- Jingguo
>
> Matthias J. Sax <ma...@confluent.io> 于2019年1月2日周三 下午10:27写道:
> >
> > > After some time, the window closes.
> >
> > This is not correct. Windows are based on event-time, and because no new
> > input record is processed, the window is not closed. That is the reason
> > why you don't get any output. Only a new input record can advance
> > "stream time" and close the window.
> >
> > In practice, when data flows continuously, this should not be a issue
> > though.
> >
> >
> > -Matthias
> >
> > On 12/31/18 8:22 AM, jingguo yao wrote:
> > > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does
> > > the window final result is not emitted after the window has elapsed?"
> > >
> > > I have browsed the Kafka source code and found the cause of the
> > > mentioned behaviour.
> > >
> > >
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> > > has the following code:
> > >
> > > @Override
> > > public void process(final K key, final Change<V> value) {
> > >   buffer(key, value);
> > >   enforceConstraints();
> > > }
> > >
> > > enforceConstraints method invocation emits window results under some
> > > conditions in the above code.
> > >
> > > After process method processes the first record, the window begins.
> > > After some time, the window closes. But before process is invoked
> > > again (triggered by receiving another record), there is no chance to
> > > emit the window result.
> > >
> > > Are there some configuration options to emit the window result without
> > > waiting for another record to arrive?
> > >
> > > And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> > > 5.1.0.
> > >
> > > jingguo yao <ya...@gmail.com> 于2018年12月30日周日 下午10:53写道:
> > >>
> > >> I followed [1] to code a simple example to try suppress operator.
> > >>
> > >> Here is the simple code:
> > >>
> > >> final Serde<String> stringSerde = Serdes.String();
> > >> final StreamsBuilder builder = new StreamsBuilder();
> > >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
> > >> Serdes.String()))
> > >>   .flatMapValues(value ->
> Arrays.asList(value.toLowerCase().split("\\W+")))
> > >>   .groupBy((key, word) -> word,
> > >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
> > >>
>  .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
> > >>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
> > >>
>  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > >>   .toStream()
> > >>   .foreach(
> > >>       (key, value) -> {
> > >>         System.out.printf("key: %s, value: %d\n", key, value);
> > >>       });
> > >>
> > >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If
> I
> > >> send one text line "hello", nothing will be printed even I wait for
> > >> more than 3 seconds (the window size). Since the time longer than the
> > >> window size has elapsed, I think that key and value should be printed.
> > >>
> > >> But if I send another text line "hello", key and value will be
> > >> printed.
> > >>
> > >> Can anyone explain this behavior? I have browsed the Kafka
> > >> documentation. But I can't find an explanation.
> > >>
> > >> [1]
> http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
> > >>
> > >>
> > >> --
> > >> Jingguo
> > >
> > >
> > >
> >
>
>
> --
> Jingguo
>


-- 
-- Guozhang

Re: Whey does the window final result is not emitted after the window has elapsed?

Posted by jingguo yao <ya...@gmail.com>.
Hi, Matthias

I am doing a 5-minute tumbling window analysis over a 57-minute data
flow. And I want only one final result per window. So I need suppress.
The 57-minute period can be divided into about 12 windows. The results
of the first 11 windows can be delivered downstream. But the final
result for the last 2-minute window can never be delivered downstream
since there is no a new record to advance the window.

Is there any workaround to deliver the result for the last window in
my situation?

-- Jingguo

Matthias J. Sax <ma...@confluent.io> 于2019年1月2日周三 下午10:27写道:
>
> > After some time, the window closes.
>
> This is not correct. Windows are based on event-time, and because no new
> input record is processed, the window is not closed. That is the reason
> why you don't get any output. Only a new input record can advance
> "stream time" and close the window.
>
> In practice, when data flows continuously, this should not be a issue
> though.
>
>
> -Matthias
>
> On 12/31/18 8:22 AM, jingguo yao wrote:
> > Sorry for my typo in the mail. "Whey" should be "Why" in "Whey does
> > the window final result is not emitted after the window has elapsed?"
> >
> > I have browsed the Kafka source code and found the cause of the
> > mentioned behaviour.
> >
> > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor
> > has the following code:
> >
> > @Override
> > public void process(final K key, final Change<V> value) {
> >   buffer(key, value);
> >   enforceConstraints();
> > }
> >
> > enforceConstraints method invocation emits window results under some
> > conditions in the above code.
> >
> > After process method processes the first record, the window begins.
> > After some time, the window closes. But before process is invoked
> > again (triggered by receiving another record), there is no chance to
> > emit the window result.
> >
> > Are there some configuration options to emit the window result without
> > waiting for another record to arrive?
> >
> > And I using Kafka 2.1.0 contained in Confluent Open Source Edition
> > 5.1.0.
> >
> > jingguo yao <ya...@gmail.com> 于2018年12月30日周日 下午10:53写道:
> >>
> >> I followed [1] to code a simple example to try suppress operator.
> >>
> >> Here is the simple code:
> >>
> >> final Serde<String> stringSerde = Serdes.String();
> >> final StreamsBuilder builder = new StreamsBuilder();
> >> builder.stream("TextLinesTopic", Consumed.with(Serdes.String(),
> >> Serdes.String()))
> >>   .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
> >>   .groupBy((key, word) -> word,
> >> Grouped.keySerde(stringSerde).withValueSerde(stringSerde))
> >>   .windowedBy(TimeWindows.of(Duration.ofSeconds(3)).grace(Duration.ofMillis(0)))
> >>   .count(Materialized.with(Serdes.String(), Serdes.Long()))
> >>   .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> >>   .toStream()
> >>   .foreach(
> >>       (key, value) -> {
> >>         System.out.printf("key: %s, value: %d\n", key, value);
> >>       });
> >>
> >> I set commit.interval.ms to 1 and cache.max.bytes.buffering to 0. If I
> >> send one text line "hello", nothing will be printed even I wait for
> >> more than 3 seconds (the window size). Since the time longer than the
> >> window size has elapsed, I think that key and value should be printed.
> >>
> >> But if I send another text line "hello", key and value will be
> >> printed.
> >>
> >> Can anyone explain this behavior? I have browsed the Kafka
> >> documentation. But I can't find an explanation.
> >>
> >> [1] http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
> >>
> >>
> >> --
> >> Jingguo
> >
> >
> >
>


-- 
Jingguo