You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by 杰 杨 <fu...@live.com> on 2018/03/09 13:36:17 UTC

kafka steams with TimeWindows ,incorrect result

Hi:
I used TimeWindow for aggregate data in kafka.

this is code snippet ;

  view.flatMap(new MultipleKeyValueMapper(client)).groupByKey(Serialized.with(Serdes.String(),
                Serdes.serdeFrom(new CountInfoSerializer(), new CountInfoDeserializer())))
        .windowedBy(TimeWindows.of(60000)).reduce(new Reducer<CountInfo>() {
            @Override
            public CountInfo apply(CountInfo value1, CountInfo value2) {
                return new CountInfo(value1.start + value2.start, value1.active + value2.active, value1.fresh + value2.fresh);
            }
        }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo, String>() {
            @Override
            public String apply(Windowed<String> key, CountInfo value) {
                return key.key();
            }
        }).print(Printed.toSysOut());

        KafkaStreams streams = new KafkaStreams(builder.build(), KStreamReducer.getConf());
        streams.start();

and I test 30000 data in kafka .
and I print key value .


[KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_21@1520601300000/1520601360000], CountInfo{start=12179, active=12179, fresh=12179}
[KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000], CountInfo{start=12179, active=12179, fresh=12179}
[KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_21@1520601300000/1520601360000], CountInfo{start=30000, active=30000, fresh=30000}
[KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000], CountInfo{start=30000, active=30000, fresh=30000}
why in one window duration will be print two result but not one result ?

________________________________
funkyyj@live.com

Re: Re: kafka steams with TimeWindows ,incorrect result

Posted by 杰 杨 <fu...@live.com>.
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?


________________________________
funkyyj@live.com

From: Guozhang Wang<ma...@gmail.com>
Date: 2018-04-27 01:50
To: users<ma...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wa...@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(60000))`, then you can do
>
> `
> windows = TimeWindows.of(60000);
>
> Stores.WindowStoreBuilder(
>         Stores.persistentWindowStore("Counts"),
>         windows.maintainMs(),
>
>         windows.segments,
>
>         windows.size(),
>         true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <fu...@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> ________________________________
>> funkyyj@live.com
>>
>> From: Guozhang Wang<ma...@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<ma...@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <fu...@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > ________________________________
>> > funkyyj@live.com
>> >
>> > From: Guozhang Wang<ma...@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<ma...@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > >                 Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer())))
>> > >         .windowedBy(TimeWindows.of(60000)).reduce(new
>> > > Reducer<CountInfo>() {
>> > >             @Override
>> > >             public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > >                 return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > >             }
>> > >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
>> > > String>() {
>> > >             @Override
>> > >             public String apply(Windowed<String> key, CountInfo
>> value) {
>> > >                 return key.key();
>> > >             }
>> > >         }).print(Printed.toSysOut());
>> > >
>> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > >         streams.start();
>> > >
>> > > and I test 30000 data in kafka .
>> > > and I print key value .
>> > >
>> > >
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
>> > > fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=12179, active=12179, fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
>> > > fresh=30000}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=30000, active=30000, fresh=30000}
>> > > why in one window duration will be print two result but not one
>> result ?
>> > >
>> > > ________________________________
>> > > funkyyj@live.com
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang

Re: Re: kafka streams with TimeWindows ,incorrect result

Posted by 杰 杨 <fu...@live.com>.
the value is defined type which implements Serializer and Deserializer

________________________________
funkyyj@live.com

From: Ted Yu<ma...@gmail.com>
Date: 2018-04-27 16:39
To: users<ma...@kafka.apache.org>; 杰 杨<ma...@live.com>
Subject: Re: Re: kafka streams with TimeWindows ,incorrect result
Noticed a typo in streams in subject.

Corrected it in this reply.

-------- Original message --------
From: 杰 杨 <fu...@live.com>
Date: 4/27/18 1:28 AM (GMT-08:00)
To: 杰 杨 <fu...@live.com>, users <us...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result

and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.


________________________________
funkyyj@live.com

From: funkyyj@live.com<ma...@live.com>
Date: 2018-04-27 16:08
To: users<ma...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?


________________________________
funkyyj@live.com

From: Guozhang Wang<ma...@gmail.com>
Date: 2018-04-27 01:50
To: users<ma...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wa...@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(60000))`, then you can do
>
> `
> windows = TimeWindows.of(60000);
>
> Stores.WindowStoreBuilder(
>         Stores.persistentWindowStore("Counts"),
>         windows.maintainMs(),
>
>         windows.segments,
>
>         windows.size(),
>         true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <fu...@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> ________________________________
>> funkyyj@live.com
>>
>> From: Guozhang Wang<ma...@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<ma...@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <fu...@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > ________________________________
>> > funkyyj@live.com
>> >
>> > From: Guozhang Wang<ma...@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<ma...@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > >                 Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer())))
>> > >         .windowedBy(TimeWindows.of(60000)).reduce(new
>> > > Reducer<CountInfo>() {
>> > >             @Override
>> > >             public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > >                 return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > >             }
>> > >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
>> > > String>() {
>> > >             @Override
>> > >             public String apply(Windowed<String> key, CountInfo
>> value) {
>> > >                 return key.key();
>> > >             }
>> > >         }).print(Printed.toSysOut());
>> > >
>> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > >         streams.start();
>> > >
>> > > and I test 30000 data in kafka .
>> > > and I print key value .
>> > >
>> > >
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
>> > > fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=12179, active=12179, fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
>> > > fresh=30000}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=30000, active=30000, fresh=30000}
>> > > why in one window duration will be print two result but not one
>> result ?
>> > >
>> > > ________________________________
>> > > funkyyj@live.com
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang

Re: Re: kafka streams with TimeWindows ,incorrect result

Posted by Ted Yu <yu...@gmail.com>.
Noticed a typo in streams in subject.
Corrected it in this reply.
-------- Original message --------From: 杰 杨 <fu...@live.com> Date: 4/27/18  1:28 AM  (GMT-08:00) To: 杰 杨 <fu...@live.com>, users <us...@kafka.apache.org> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result 
and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.


________________________________
funkyyj@live.com

From: funkyyj@live.com<ma...@live.com>
Date: 2018-04-27 16:08
To: users<ma...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?


________________________________
funkyyj@live.com

From: Guozhang Wang<ma...@gmail.com>
Date: 2018-04-27 01:50
To: users<ma...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wa...@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(60000))`, then you can do
>
> `
> windows = TimeWindows.of(60000);
>
> Stores.WindowStoreBuilder(
>         Stores.persistentWindowStore("Counts"),
>         windows.maintainMs(),
>
>         windows.segments,
>
>         windows.size(),
>         true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <fu...@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> ________________________________
>> funkyyj@live.com
>>
>> From: Guozhang Wang<ma...@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<ma...@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <fu...@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > ________________________________
>> > funkyyj@live.com
>> >
>> > From: Guozhang Wang<ma...@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<ma...@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > >                 Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer())))
>> > >         .windowedBy(TimeWindows.of(60000)).reduce(new
>> > > Reducer<CountInfo>() {
>> > >             @Override
>> > >             public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > >                 return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > >             }
>> > >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
>> > > String>() {
>> > >             @Override
>> > >             public String apply(Windowed<String> key, CountInfo
>> value) {
>> > >                 return key.key();
>> > >             }
>> > >         }).print(Printed.toSysOut());
>> > >
>> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > >         streams.start();
>> > >
>> > > and I test 30000 data in kafka .
>> > > and I print key value .
>> > >
>> > >
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
>> > > fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=12179, active=12179, fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
>> > > fresh=30000}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=30000, active=30000, fresh=30000}
>> > > why in one window duration will be print two result but not one
>> result ?
>> > >
>> > > ________________________________
>> > > funkyyj@live.com
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang

Re: Re: kafka steams with TimeWindows ,incorrect result

Posted by 杰 杨 <fu...@live.com>.
and I checked windowStore interface found it has put method not get method.
in one second
the stream had sample key and different value in it.
and I must update key value which store in it.


________________________________
funkyyj@live.com

From: funkyyj@live.com<ma...@live.com>
Date: 2018-04-27 16:08
To: users<ma...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Hi:
I don't kown what to do with transform function.
and stream is preapred well
like this at blew
key:
44_14_2018-04-27
value:
CountInfo(start=1,active=0,fresh =0)

there is amount data like that。
how I aggregate it with peer 1 seconds using transform function?


________________________________
funkyyj@live.com

From: Guozhang Wang<ma...@gmail.com>
Date: 2018-04-27 01:50
To: users<ma...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wa...@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(60000))`, then you can do
>
> `
> windows = TimeWindows.of(60000);
>
> Stores.WindowStoreBuilder(
>         Stores.persistentWindowStore("Counts"),
>         windows.maintainMs(),
>
>         windows.segments,
>
>         windows.size(),
>         true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <fu...@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> ________________________________
>> funkyyj@live.com
>>
>> From: Guozhang Wang<ma...@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<ma...@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <fu...@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > ________________________________
>> > funkyyj@live.com
>> >
>> > From: Guozhang Wang<ma...@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<ma...@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > >                 Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer())))
>> > >         .windowedBy(TimeWindows.of(60000)).reduce(new
>> > > Reducer<CountInfo>() {
>> > >             @Override
>> > >             public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > >                 return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > >             }
>> > >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
>> > > String>() {
>> > >             @Override
>> > >             public String apply(Windowed<String> key, CountInfo
>> value) {
>> > >                 return key.key();
>> > >             }
>> > >         }).print(Printed.toSysOut());
>> > >
>> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > >         streams.start();
>> > >
>> > > and I test 30000 data in kafka .
>> > > and I print key value .
>> > >
>> > >
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
>> > > fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=12179, active=12179, fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
>> > > fresh=30000}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=30000, active=30000, fresh=30000}
>> > > why in one window duration will be print two result but not one
>> result ?
>> > >
>> > > ________________________________
>> > > funkyyj@live.com
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang

Re: Re: kafka steams with TimeWindows ,incorrect result

Posted by Guozhang Wang <wa...@gmail.com>.
Using a control message to flush results to downstream (in your case to the
result db) looks good to me as well.

On Thu, Apr 26, 2018 at 10:49 AM, Guozhang Wang <wa...@gmail.com> wrote:

> If you're talking about which store to use in your transform function, it
> should be a windowed store.
>
> You can create such a store with the `Stores` factory, and suppose your
> old code has `windowedBy(TimeWindows.of(60000))`, then you can do
>
> `
> windows = TimeWindows.of(60000);
>
> Stores.WindowStoreBuilder(
>         Stores.persistentWindowStore("Counts"),
>         windows.maintainMs(),
>
>         windows.segments,
>
>         windows.size(),
>         true)
>
> )
>
> `
>
>
> Guozhang
>
>
>
> On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <fu...@live.com> wrote:
>
>> I return back .
>> Which StateStore could I use for this problem?
>> and another idea .I can send 'flush' message into this topic .
>> when received this message could update results to db.
>> I don't know it's work?
>>
>> ________________________________
>> funkyyj@live.com
>>
>> From: Guozhang Wang<ma...@gmail.com>
>> Date: 2018-03-12 03:58
>> To: users<ma...@kafka.apache.org>
>> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
>> If you want to strictly "only have one output per window", then for now
>> you'd probably implement that logic using a lower-level "transform"
>> function in which you can schedule a punctuate function to send all the
>> results at the end of a window.
>>
>> If you just want to reduce the amount of data to your sink, but your sink
>> can still handle overwritten records of the same key, you can enlarge the
>> cache size via the cache.max.bytes.buffering config.
>>
>> https://kafka.apache.org/documentation/#streamsconfigs
>>
>> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <fu...@live.com> wrote:
>>
>> > thx for your reply!
>> > I see that it is designed to operate on an infinite, unbounded stream of
>> > data.
>> > now I want to process for  unbounded stream but divided by time
>> interval .
>> > so what can I do for doing this ?
>> >
>> > ________________________________
>> > funkyyj@live.com
>> >
>> > From: Guozhang Wang<ma...@gmail.com>
>> > Date: 2018-03-10 02:50
>> > To: users<ma...@kafka.apache.org>
>> > Subject: Re: kafka steams with TimeWindows ,incorrect result
>> > Hi Jie,
>> >
>> > This is by design of Kafka Streams, please read this doc for more
>> details
>> > (search for "outputs of the Wordcount application is actually a
>> continuous
>> > stream of updates"):
>> >
>> > https://kafka.apache.org/0110/documentation/streams/quickstart
>> >
>> > Note this semantics applies for both windowed and un-windowed tables.
>> >
>> >
>> > Guozhang
>> >
>> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:
>> >
>> > > Hi:
>> > > I used TimeWindow for aggregate data in kafka.
>> > >
>> > > this is code snippet ;
>> > >
>> > >   view.flatMap(new MultipleKeyValueMapper(client)
>> > > ).groupByKey(Serialized.with(Serdes.String(),
>> > >                 Serdes.serdeFrom(new CountInfoSerializer(), new
>> > > CountInfoDeserializer())))
>> > >         .windowedBy(TimeWindows.of(60000)).reduce(new
>> > > Reducer<CountInfo>() {
>> > >             @Override
>> > >             public CountInfo apply(CountInfo value1, CountInfo
>> value2) {
>> > >                 return new CountInfo(value1.start + value2.start,
>> > > value1.active + value2.active, value1.fresh + value2.fresh);
>> > >             }
>> > >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
>> > > String>() {
>> > >             @Override
>> > >             public String apply(Windowed<String> key, CountInfo
>> value) {
>> > >                 return key.key();
>> > >             }
>> > >         }).print(Printed.toSysOut());
>> > >
>> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
>> > > KStreamReducer.getConf());
>> > >         streams.start();
>> > >
>> > > and I test 30000 data in kafka .
>> > > and I print key value .
>> > >
>> > >
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
>> > > fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=12179, active=12179, fresh=12179}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
>> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
>> > > fresh=30000}
>> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
>> > 1520601300000/1520601360000],
>> > > CountInfo{start=30000, active=30000, fresh=30000}
>> > > why in one window duration will be print two result but not one
>> result ?
>> > >
>> > > ________________________________
>> > > funkyyj@live.com
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: Re: kafka steams with TimeWindows ,incorrect result

Posted by Guozhang Wang <wa...@gmail.com>.
If you're talking about which store to use in your transform function, it
should be a windowed store.

You can create such a store with the `Stores` factory, and suppose your old
code has `windowedBy(TimeWindows.of(60000))`, then you can do

`
windows = TimeWindows.of(60000);

Stores.WindowStoreBuilder(
        Stores.persistentWindowStore("Counts"),
        windows.maintainMs(),

        windows.segments,

        windows.size(),
        true)

)

`


Guozhang



On Thu, Apr 26, 2018 at 4:39 AM, 杰 杨 <fu...@live.com> wrote:

> I return back .
> Which StateStore could I use for this problem?
> and another idea .I can send 'flush' message into this topic .
> when received this message could update results to db.
> I don't know it's work?
>
> ________________________________
> funkyyj@live.com
>
> From: Guozhang Wang<ma...@gmail.com>
> Date: 2018-03-12 03:58
> To: users<ma...@kafka.apache.org>
> Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
> If you want to strictly "only have one output per window", then for now
> you'd probably implement that logic using a lower-level "transform"
> function in which you can schedule a punctuate function to send all the
> results at the end of a window.
>
> If you just want to reduce the amount of data to your sink, but your sink
> can still handle overwritten records of the same key, you can enlarge the
> cache size via the cache.max.bytes.buffering config.
>
> https://kafka.apache.org/documentation/#streamsconfigs
>
> On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <fu...@live.com> wrote:
>
> > thx for your reply!
> > I see that it is designed to operate on an infinite, unbounded stream of
> > data.
> > now I want to process for  unbounded stream but divided by time interval
> .
> > so what can I do for doing this ?
> >
> > ________________________________
> > funkyyj@live.com
> >
> > From: Guozhang Wang<ma...@gmail.com>
> > Date: 2018-03-10 02:50
> > To: users<ma...@kafka.apache.org>
> > Subject: Re: kafka steams with TimeWindows ,incorrect result
> > Hi Jie,
> >
> > This is by design of Kafka Streams, please read this doc for more details
> > (search for "outputs of the Wordcount application is actually a
> continuous
> > stream of updates"):
> >
> > https://kafka.apache.org/0110/documentation/streams/quickstart
> >
> > Note this semantics applies for both windowed and un-windowed tables.
> >
> >
> > Guozhang
> >
> > On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:
> >
> > > Hi:
> > > I used TimeWindow for aggregate data in kafka.
> > >
> > > this is code snippet ;
> > >
> > >   view.flatMap(new MultipleKeyValueMapper(client)
> > > ).groupByKey(Serialized.with(Serdes.String(),
> > >                 Serdes.serdeFrom(new CountInfoSerializer(), new
> > > CountInfoDeserializer())))
> > >         .windowedBy(TimeWindows.of(60000)).reduce(new
> > > Reducer<CountInfo>() {
> > >             @Override
> > >             public CountInfo apply(CountInfo value1, CountInfo value2)
> {
> > >                 return new CountInfo(value1.start + value2.start,
> > > value1.active + value2.active, value1.fresh + value2.fresh);
> > >             }
> > >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
> > > String>() {
> > >             @Override
> > >             public String apply(Windowed<String> key, CountInfo value)
> {
> > >                 return key.key();
> > >             }
> > >         }).print(Printed.toSysOut());
> > >
> > >         KafkaStreams streams = new KafkaStreams(builder.build(),
> > > KStreamReducer.getConf());
> > >         streams.start();
> > >
> > > and I test 30000 data in kafka .
> > > and I print key value .
> > >
> > >
> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
> > > fresh=12179}
> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> > 1520601300000/1520601360000],
> > > CountInfo{start=12179, active=12179, fresh=12179}
> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
> > > fresh=30000}
> > > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> > 1520601300000/1520601360000],
> > > CountInfo{start=30000, active=30000, fresh=30000}
> > > why in one window duration will be print two result but not one result
> ?
> > >
> > > ________________________________
> > > funkyyj@live.com
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: Re: kafka steams with TimeWindows ,incorrect result

Posted by 杰 杨 <fu...@live.com>.
I return back .
Which StateStore could I use for this problem?
and another idea .I can send 'flush' message into this topic .
when received this message could update results to db.
I don't know it's work?

________________________________
funkyyj@live.com

From: Guozhang Wang<ma...@gmail.com>
Date: 2018-03-12 03:58
To: users<ma...@kafka.apache.org>
Subject: Re: Re: kafka steams with TimeWindows ,incorrect result
If you want to strictly "only have one output per window", then for now
you'd probably implement that logic using a lower-level "transform"
function in which you can schedule a punctuate function to send all the
results at the end of a window.

If you just want to reduce the amount of data to your sink, but your sink
can still handle overwritten records of the same key, you can enlarge the
cache size via the cache.max.bytes.buffering config.

https://kafka.apache.org/documentation/#streamsconfigs

On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <fu...@live.com> wrote:

> thx for your reply!
> I see that it is designed to operate on an infinite, unbounded stream of
> data.
> now I want to process for  unbounded stream but divided by time interval .
> so what can I do for doing this ?
>
> ________________________________
> funkyyj@live.com
>
> From: Guozhang Wang<ma...@gmail.com>
> Date: 2018-03-10 02:50
> To: users<ma...@kafka.apache.org>
> Subject: Re: kafka steams with TimeWindows ,incorrect result
> Hi Jie,
>
> This is by design of Kafka Streams, please read this doc for more details
> (search for "outputs of the Wordcount application is actually a continuous
> stream of updates"):
>
> https://kafka.apache.org/0110/documentation/streams/quickstart
>
> Note this semantics applies for both windowed and un-windowed tables.
>
>
> Guozhang
>
> On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:
>
> > Hi:
> > I used TimeWindow for aggregate data in kafka.
> >
> > this is code snippet ;
> >
> >   view.flatMap(new MultipleKeyValueMapper(client)
> > ).groupByKey(Serialized.with(Serdes.String(),
> >                 Serdes.serdeFrom(new CountInfoSerializer(), new
> > CountInfoDeserializer())))
> >         .windowedBy(TimeWindows.of(60000)).reduce(new
> > Reducer<CountInfo>() {
> >             @Override
> >             public CountInfo apply(CountInfo value1, CountInfo value2) {
> >                 return new CountInfo(value1.start + value2.start,
> > value1.active + value2.active, value1.fresh + value2.fresh);
> >             }
> >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
> > String>() {
> >             @Override
> >             public String apply(Windowed<String> key, CountInfo value) {
> >                 return key.key();
> >             }
> >         }).print(Printed.toSysOut());
> >
> >         KafkaStreams streams = new KafkaStreams(builder.build(),
> > KStreamReducer.getConf());
> >         streams.start();
> >
> > and I test 30000 data in kafka .
> > and I print key value .
> >
> >
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
> > fresh=12179}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> 1520601300000/1520601360000],
> > CountInfo{start=12179, active=12179, fresh=12179}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
> > fresh=30000}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> 1520601300000/1520601360000],
> > CountInfo{start=30000, active=30000, fresh=30000}
> > why in one window duration will be print two result but not one result ?
> >
> > ________________________________
> > funkyyj@live.com
> >
>
>
>
> --
> -- Guozhang
>



--
-- Guozhang

Re: Re: kafka steams with TimeWindows ,incorrect result

Posted by Guozhang Wang <wa...@gmail.com>.
If you want to strictly "only have one output per window", then for now
you'd probably implement that logic using a lower-level "transform"
function in which you can schedule a punctuate function to send all the
results at the end of a window.

If you just want to reduce the amount of data to your sink, but your sink
can still handle overwritten records of the same key, you can enlarge the
cache size via the cache.max.bytes.buffering config.

https://kafka.apache.org/documentation/#streamsconfigs

On Fri, Mar 9, 2018 at 9:45 PM, 杰 杨 <fu...@live.com> wrote:

> thx for your reply!
> I see that it is designed to operate on an infinite, unbounded stream of
> data.
> now I want to process for  unbounded stream but divided by time interval .
> so what can I do for doing this ?
>
> ________________________________
> funkyyj@live.com
>
> From: Guozhang Wang<ma...@gmail.com>
> Date: 2018-03-10 02:50
> To: users<ma...@kafka.apache.org>
> Subject: Re: kafka steams with TimeWindows ,incorrect result
> Hi Jie,
>
> This is by design of Kafka Streams, please read this doc for more details
> (search for "outputs of the Wordcount application is actually a continuous
> stream of updates"):
>
> https://kafka.apache.org/0110/documentation/streams/quickstart
>
> Note this semantics applies for both windowed and un-windowed tables.
>
>
> Guozhang
>
> On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:
>
> > Hi:
> > I used TimeWindow for aggregate data in kafka.
> >
> > this is code snippet ;
> >
> >   view.flatMap(new MultipleKeyValueMapper(client)
> > ).groupByKey(Serialized.with(Serdes.String(),
> >                 Serdes.serdeFrom(new CountInfoSerializer(), new
> > CountInfoDeserializer())))
> >         .windowedBy(TimeWindows.of(60000)).reduce(new
> > Reducer<CountInfo>() {
> >             @Override
> >             public CountInfo apply(CountInfo value1, CountInfo value2) {
> >                 return new CountInfo(value1.start + value2.start,
> > value1.active + value2.active, value1.fresh + value2.fresh);
> >             }
> >         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
> > String>() {
> >             @Override
> >             public String apply(Windowed<String> key, CountInfo value) {
> >                 return key.key();
> >             }
> >         }).print(Printed.toSysOut());
> >
> >         KafkaStreams streams = new KafkaStreams(builder.build(),
> > KStreamReducer.getConf());
> >         streams.start();
> >
> > and I test 30000 data in kafka .
> > and I print key value .
> >
> >
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
> > fresh=12179}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> 1520601300000/1520601360000],
> > CountInfo{start=12179, active=12179, fresh=12179}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> > 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
> > fresh=30000}
> > [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@
> 1520601300000/1520601360000],
> > CountInfo{start=30000, active=30000, fresh=30000}
> > why in one window duration will be print two result but not one result ?
> >
> > ________________________________
> > funkyyj@live.com
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: Re: kafka steams with TimeWindows ,incorrect result

Posted by 杰 杨 <fu...@live.com>.
thx for your reply!
I see that it is designed to operate on an infinite, unbounded stream of data.
now I want to process for  unbounded stream but divided by time interval .
so what can I do for doing this ?

________________________________
funkyyj@live.com

From: Guozhang Wang<ma...@gmail.com>
Date: 2018-03-10 02:50
To: users<ma...@kafka.apache.org>
Subject: Re: kafka steams with TimeWindows ,incorrect result
Hi Jie,

This is by design of Kafka Streams, please read this doc for more details
(search for "outputs of the Wordcount application is actually a continuous
stream of updates"):

https://kafka.apache.org/0110/documentation/streams/quickstart

Note this semantics applies for both windowed and un-windowed tables.


Guozhang

On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:

> Hi:
> I used TimeWindow for aggregate data in kafka.
>
> this is code snippet ;
>
>   view.flatMap(new MultipleKeyValueMapper(client)
> ).groupByKey(Serialized.with(Serdes.String(),
>                 Serdes.serdeFrom(new CountInfoSerializer(), new
> CountInfoDeserializer())))
>         .windowedBy(TimeWindows.of(60000)).reduce(new
> Reducer<CountInfo>() {
>             @Override
>             public CountInfo apply(CountInfo value1, CountInfo value2) {
>                 return new CountInfo(value1.start + value2.start,
> value1.active + value2.active, value1.fresh + value2.fresh);
>             }
>         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
> String>() {
>             @Override
>             public String apply(Windowed<String> key, CountInfo value) {
>                 return key.key();
>             }
>         }).print(Printed.toSysOut());
>
>         KafkaStreams streams = new KafkaStreams(builder.build(),
> KStreamReducer.getConf());
>         streams.start();
>
> and I test 30000 data in kafka .
> and I print key value .
>
>
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
> fresh=12179}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000],
> CountInfo{start=12179, active=12179, fresh=12179}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
> fresh=30000}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000],
> CountInfo{start=30000, active=30000, fresh=30000}
> why in one window duration will be print two result but not one result ?
>
> ________________________________
> funkyyj@live.com
>



--
-- Guozhang

Re: kafka steams with TimeWindows ,incorrect result

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Jie,

This is by design of Kafka Streams, please read this doc for more details
(search for "outputs of the Wordcount application is actually a continuous
stream of updates"):

https://kafka.apache.org/0110/documentation/streams/quickstart

Note this semantics applies for both windowed and un-windowed tables.


Guozhang

On Fri, Mar 9, 2018 at 5:36 AM, 杰 杨 <fu...@live.com> wrote:

> Hi:
> I used TimeWindow for aggregate data in kafka.
>
> this is code snippet ;
>
>   view.flatMap(new MultipleKeyValueMapper(client)
> ).groupByKey(Serialized.with(Serdes.String(),
>                 Serdes.serdeFrom(new CountInfoSerializer(), new
> CountInfoDeserializer())))
>         .windowedBy(TimeWindows.of(60000)).reduce(new
> Reducer<CountInfo>() {
>             @Override
>             public CountInfo apply(CountInfo value1, CountInfo value2) {
>                 return new CountInfo(value1.start + value2.start,
> value1.active + value2.active, value1.fresh + value2.fresh);
>             }
>         }) .toStream(new KeyValueMapper<Windowed<String>, CountInfo,
> String>() {
>             @Override
>             public String apply(Windowed<String> key, CountInfo value) {
>                 return key.key();
>             }
>         }).print(Printed.toSysOut());
>
>         KafkaStreams streams = new KafkaStreams(builder.build(),
> KStreamReducer.getConf());
>         streams.start();
>
> and I test 30000 data in kafka .
> and I print key value .
>
>
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> 21@1520601300000/1520601360000], CountInfo{start=12179, active=12179,
> fresh=12179}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000],
> CountInfo{start=12179, active=12179, fresh=12179}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09_hour_
> 21@1520601300000/1520601360000], CountInfo{start=30000, active=30000,
> fresh=30000}
> [KTABLE-TOSTREAM-0000000007]: [99999_99999_2018-03-09@1520601300000/1520601360000],
> CountInfo{start=30000, active=30000, fresh=30000}
> why in one window duration will be print two result but not one result ?
>
> ________________________________
> funkyyj@live.com
>



-- 
-- Guozhang