You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shankar Mane <ss...@gmail.com> on 2022/05/17 10:20:51 UTC

kafka stream - sliding window - getting unexpected output

Hi All,

Our use case is to use sliding window. (for e.g. at any point, whenever
> user performs any actions at time [ t1 ], we would like to see his activity
> in [ t1 - last 24 hours]. Using this, to show the user some recommendations.



-- I have code ready and it works without any errors.
-- aggregations happen as expected.
-- but the output generated is unexpected. As windows gets slides, i am
getting mixed output which includes intermediate aggregated records also
coming with final aggregated outputs.

Could someone please help me here ?  what can I do here to get ONLY final
aggregated output.


Code snippet :
________________________________________________________________



builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
                .filter((k, v) -> v != null)
                .map((k,v) -> KeyValue.pair(v.getUserId(), v))
                //.through("slidingbykey",
Produced.with(Serdes.String(), inputSerde))
                .groupByKey()

.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
windowDuration))
                .aggregate(OutputPojo::new, (k, tr, out) -> {
                    out.setUserId(tr.getUserId());
                    out.setCount(out.getCount() +1);
                    out.setSum(out.getSum() + tr.getInt4());
                    out.setUuid(tr.getUuid());

out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
                    waitForMs(200); //added delay just for analysing output
                    return out;
                }, Materialized.with(stringSerde, outputSerde))
                .suppress(Suppressed.untilTimeLimit(windowDuration,
Suppressed.BufferConfig.unbounded()))
                .toStream()
                .map((Windowed<String> key, OutputPojo out) -> {
                    return new KeyValue<>(key.key(),out) ;
                })
                .print(Printed.toSysOut());
//                .to(aveTempOutputTopic, Produced.with(stringSerde,
outputSerde))
                ;



________________________________________________________________


Input data :

for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done
> {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
> '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
> {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
> 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
> {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
> '48d2b4ea-052d-42fa-a998-0216d928c034'}
> {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
> '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
> {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
> 'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
> {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
> '135dc5cd-50cb-467b-9e63-300fdeedaf75'}
> {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
> '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
> {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
> 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
> {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
> '7baa4254-b9da-43dc-bbb7-4caede578aeb'}
> {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
> '16541989-f3ba-49f6-bd31-bf8a75ba8eac'}


________________________________________________________________


Output (*Unexpected*) :  below output is captured at each sliding window of
1s duration   (but input data is published at 2s of interval) :

[KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:31:28.263,
> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)  ----> seems older UUID
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
> strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
> strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:05.029, uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:05.029, uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:07.227, uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:07.227, uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:09.425, uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:09.425, uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:11.623, uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:11.623, uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:13.820, uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
>
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-17 15:32:13.820, uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-17 15:32:16.018, uuid=16541989-f3ba-49f6-bd31-bf8a75ba8eac)


Regards,
Shankar

Re: kafka stream - sliding window - getting unexpected output

Posted by "Matthias J. Sax" <mj...@apache.org>.
Not sure atm.

It seems you are printing the timestamp extracted from the payload:

> out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));

Does this timestamp really map to the window?

You remove the window information so maybe you are looking at the wrong 
data?

> .map((Windowed<String> key, OutputPojo out) -> {
>        return new KeyValue<>(key.key(),out) ;
>      })


For the input: Do you use a custom timestamp extractor and use the 
payload timestamp? If not, does the record timestamp and the payload 
timestamp match?


-Matthias


On 5/18/22 11:32 PM, Shankar Mane wrote:
> @Matthias J. Sax / All
> 
> Have added below line :
> 
>> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>>
>>
> 
> Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*,
> expecting single output but that is not the case here. Which 1 is the final
> output from those 2 rows for the same uuid ?
> 
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
>> strTime=2022-05-19 11:48:08.128, uuid=fb6bea5f-8fd0-4c03-8df3-aaf392f04a5a)
> 
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-19 11:48:10.328, uuid=b4ab837f-b10a-452d-a663-719215d2992f)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-19 11:48:12.527, uuid=8fa1b621-c967-4770-9f85-9fd84999c97c)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
>> strTime=2022-05-19 11:48:14.726, uuid=1fc21253-7859-45ef-969e-82ed596c4fa0)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
>> strTime=2022-05-19 11:48:16.925, uuid=
>> *2cbef750-325b-4a2f-ac39-b2c23fa0313f)*
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
>> strTime=2022-05-19 11:48:16.925, uuid=
>> *2cbef750-325b-4a2f-ac39-b2c23fa0313f*)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
>> strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
>> strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
>> strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
>> strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
>> strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
>> strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
>> strTime=2022-05-19 11:48:25.721, uuid=fbe62fa4-e7c4-437f-b976-0bb7ae0c4390)
> 
> 
> On Wed, May 18, 2022 at 10:21 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> Emitting intermediate result is by-design.
>>
>> If you don't want to get intermediate result, you can add `suppress()`
>> after the aggregation and configure it to only "emit on window close".
>>
>> -Matthias
>>
>> On 5/17/22 3:20 AM, Shankar Mane wrote:
>>> Hi All,
>>>
>>> Our use case is to use sliding window. (for e.g. at any point, whenever
>>>> user performs any actions at time [ t1 ], we would like to see his
>> activity
>>>> in [ t1 - last 24 hours]. Using this, to show the user some
>> recommendations.
>>>
>>>
>>>
>>> -- I have code ready and it works without any errors.
>>> -- aggregations happen as expected.
>>> -- but the output generated is unexpected. As windows gets slides, i am
>>> getting mixed output which includes intermediate aggregated records also
>>> coming with final aggregated outputs.
>>>
>>> Could someone please help me here ?  what can I do here to get ONLY final
>>> aggregated output.
>>>
>>>
>>> Code snippet :
>>> ________________________________________________________________
>>>
>>>
>>>
>>> builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
>>>                   .filter((k, v) -> v != null)
>>>                   .map((k,v) -> KeyValue.pair(v.getUserId(), v))
>>>                   //.through("slidingbykey",
>>> Produced.with(Serdes.String(), inputSerde))
>>>                   .groupByKey()
>>>
>>>
>> .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
>>> windowDuration))
>>>                   .aggregate(OutputPojo::new, (k, tr, out) -> {
>>>                       out.setUserId(tr.getUserId());
>>>                       out.setCount(out.getCount() +1);
>>>                       out.setSum(out.getSum() + tr.getInt4());
>>>                       out.setUuid(tr.getUuid());
>>>
>>> out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
>>>                       waitForMs(200); //added delay just for analysing
>> output
>>>                       return out;
>>>                   }, Materialized.with(stringSerde, outputSerde))
>>>                   .suppress(Suppressed.untilTimeLimit(windowDuration,
>>> Suppressed.BufferConfig.unbounded()))
>>>                   .toStream()
>>>                   .map((Windowed<String> key, OutputPojo out) -> {
>>>                       return new KeyValue<>(key.key(),out) ;
>>>                   })
>>>                   .print(Printed.toSysOut());
>>> //                .to(aveTempOutputTopic, Produced.with(stringSerde,
>>> outputSerde))
>>>                   ;
>>>
>>>
>>>
>>> ________________________________________________________________
>>>
>>>
>>> Input data :
>>>
>>> for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done
>>>> {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
>>>> '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
>>>> {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
>>>> 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
>>>> {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
>>>> '48d2b4ea-052d-42fa-a998-0216d928c034'}
>>>> {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
>>>> '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
>>>> {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
>>>> 'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
>>>> {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
>>>> '135dc5cd-50cb-467b-9e63-300fdeedaf75'}
>>>> {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
>>>> '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
>>>> {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
>>>> 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
>>>> {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
>>>> '7baa4254-b9da-43dc-bbb7-4caede578aeb'}
>>>> {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
>>>> '16541989-f3ba-49f6-bd31-bf8a75ba8eac'}
>>>
>>>
>>> ________________________________________________________________
>>>
>>>
>>> Output (*Unexpected*) :  below output is captured at each sliding window
>> of
>>> 1s duration   (but input data is published at 2s of interval) :
>>>
>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
>> sum=30.0,
>>>> strTime=2022-05-17 15:31:28.263,
>>>> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)  ----> seems older UUID
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1,
>> sum=10.0,
>>>> strTime=2022-05-17 15:31:28.263,
>> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1,
>> sum=10.0,
>>>> strTime=2022-05-17 15:31:56.234,
>> uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:31:58.436,
>> uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:32:00.634,
>> uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
>> sum=30.0,
>>>> strTime=2022-05-17 15:32:00.634,
>> uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:32:02.832,
>> uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
>> sum=30.0,
>>>> strTime=2022-05-17 15:32:02.832,
>> uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:32:05.029,
>> uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
>> sum=30.0,
>>>> strTime=2022-05-17 15:32:05.029,
>> uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:32:07.227,
>> uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
>> sum=30.0,
>>>> strTime=2022-05-17 15:32:07.227,
>> uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:32:09.425,
>> uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
>> sum=30.0,
>>>> strTime=2022-05-17 15:32:09.425,
>> uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:32:11.623,
>> uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
>> sum=30.0,
>>>> strTime=2022-05-17 15:32:11.623,
>> uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:32:13.820,
>> uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
>>>>
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
>> sum=30.0,
>>>> strTime=2022-05-17 15:32:13.820,
>> uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
>>>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
>> sum=20.0,
>>>> strTime=2022-05-17 15:32:16.018,
>> uuid=16541989-f3ba-49f6-bd31-bf8a75ba8eac)
>>>
>>>
>>> Regards,
>>> Shankar
>>>
>>
> 

Re: kafka stream - sliding window - getting unexpected output

Posted by Shankar Mane <ss...@gmail.com>.
@Matthias J. Sax / All

Have added below line :

> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>
>

Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*,
expecting single output but that is not the case here. Which 1 is the final
output from those 2 rows for the same uuid ?

[KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
> strTime=2022-05-19 11:48:08.128, uuid=fb6bea5f-8fd0-4c03-8df3-aaf392f04a5a)

[KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-19 11:48:10.328, uuid=b4ab837f-b10a-452d-a663-719215d2992f)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-19 11:48:12.527, uuid=8fa1b621-c967-4770-9f85-9fd84999c97c)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:14.726, uuid=1fc21253-7859-45ef-969e-82ed596c4fa0)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:16.925, uuid=
> *2cbef750-325b-4a2f-ac39-b2c23fa0313f)*
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:16.925, uuid=
> *2cbef750-325b-4a2f-ac39-b2c23fa0313f*)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:25.721, uuid=fbe62fa4-e7c4-437f-b976-0bb7ae0c4390)


On Wed, May 18, 2022 at 10:21 PM Matthias J. Sax <mj...@apache.org> wrote:

> Emitting intermediate result is by-design.
>
> If you don't want to get intermediate result, you can add `suppress()`
> after the aggregation and configure it to only "emit on window close".
>
> -Matthias
>
> On 5/17/22 3:20 AM, Shankar Mane wrote:
> > Hi All,
> >
> > Our use case is to use sliding window. (for e.g. at any point, whenever
> >> user performs any actions at time [ t1 ], we would like to see his
> activity
> >> in [ t1 - last 24 hours]. Using this, to show the user some
> recommendations.
> >
> >
> >
> > -- I have code ready and it works without any errors.
> > -- aggregations happen as expected.
> > -- but the output generated is unexpected. As windows gets slides, i am
> > getting mixed output which includes intermediate aggregated records also
> > coming with final aggregated outputs.
> >
> > Could someone please help me here ?  what can I do here to get ONLY final
> > aggregated output.
> >
> >
> > Code snippet :
> > ________________________________________________________________
> >
> >
> >
> > builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
> >                  .filter((k, v) -> v != null)
> >                  .map((k,v) -> KeyValue.pair(v.getUserId(), v))
> >                  //.through("slidingbykey",
> > Produced.with(Serdes.String(), inputSerde))
> >                  .groupByKey()
> >
> >
> .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
> > windowDuration))
> >                  .aggregate(OutputPojo::new, (k, tr, out) -> {
> >                      out.setUserId(tr.getUserId());
> >                      out.setCount(out.getCount() +1);
> >                      out.setSum(out.getSum() + tr.getInt4());
> >                      out.setUuid(tr.getUuid());
> >
> > out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
> >                      waitForMs(200); //added delay just for analysing
> output
> >                      return out;
> >                  }, Materialized.with(stringSerde, outputSerde))
> >                  .suppress(Suppressed.untilTimeLimit(windowDuration,
> > Suppressed.BufferConfig.unbounded()))
> >                  .toStream()
> >                  .map((Windowed<String> key, OutputPojo out) -> {
> >                      return new KeyValue<>(key.key(),out) ;
> >                  })
> >                  .print(Printed.toSysOut());
> > //                .to(aveTempOutputTopic, Produced.with(stringSerde,
> > outputSerde))
> >                  ;
> >
> >
> >
> > ________________________________________________________________
> >
> >
> > Input data :
> >
> > for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done
> >> {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
> >> '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
> >> {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
> >> 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
> >> {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
> >> '48d2b4ea-052d-42fa-a998-0216d928c034'}
> >> {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
> >> '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
> >> {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
> >> 'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
> >> {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
> >> '135dc5cd-50cb-467b-9e63-300fdeedaf75'}
> >> {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
> >> '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
> >> {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
> >> 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
> >> {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
> >> '7baa4254-b9da-43dc-bbb7-4caede578aeb'}
> >> {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
> >> '16541989-f3ba-49f6-bd31-bf8a75ba8eac'}
> >
> >
> > ________________________________________________________________
> >
> >
> > Output (*Unexpected*) :  below output is captured at each sliding window
> of
> > 1s duration   (but input data is published at 2s of interval) :
> >
> > [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
> sum=30.0,
> >> strTime=2022-05-17 15:31:28.263,
> >> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)  ----> seems older UUID
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1,
> sum=10.0,
> >> strTime=2022-05-17 15:31:28.263,
> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1,
> sum=10.0,
> >> strTime=2022-05-17 15:31:56.234,
> uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:31:58.436,
> uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:32:00.634,
> uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
> sum=30.0,
> >> strTime=2022-05-17 15:32:00.634,
> uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:32:02.832,
> uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
> sum=30.0,
> >> strTime=2022-05-17 15:32:02.832,
> uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:32:05.029,
> uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
> sum=30.0,
> >> strTime=2022-05-17 15:32:05.029,
> uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:32:07.227,
> uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
> sum=30.0,
> >> strTime=2022-05-17 15:32:07.227,
> uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:32:09.425,
> uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
> sum=30.0,
> >> strTime=2022-05-17 15:32:09.425,
> uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:32:11.623,
> uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
> sum=30.0,
> >> strTime=2022-05-17 15:32:11.623,
> uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:32:13.820,
> uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
> >>
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3,
> sum=30.0,
> >> strTime=2022-05-17 15:32:13.820,
> uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
> >> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2,
> sum=20.0,
> >> strTime=2022-05-17 15:32:16.018,
> uuid=16541989-f3ba-49f6-bd31-bf8a75ba8eac)
> >
> >
> > Regards,
> > Shankar
> >
>

Re: kafka stream - sliding window - getting unexpected output

Posted by "Matthias J. Sax" <mj...@apache.org>.
Emitting intermediate result is by-design.

If you don't want to get intermediate result, you can add `suppress()` 
after the aggregation and configure it to only "emit on window close".

-Matthias

On 5/17/22 3:20 AM, Shankar Mane wrote:
> Hi All,
> 
> Our use case is to use sliding window. (for e.g. at any point, whenever
>> user performs any actions at time [ t1 ], we would like to see his activity
>> in [ t1 - last 24 hours]. Using this, to show the user some recommendations.
> 
> 
> 
> -- I have code ready and it works without any errors.
> -- aggregations happen as expected.
> -- but the output generated is unexpected. As windows gets slides, i am
> getting mixed output which includes intermediate aggregated records also
> coming with final aggregated outputs.
> 
> Could someone please help me here ?  what can I do here to get ONLY final
> aggregated output.
> 
> 
> Code snippet :
> ________________________________________________________________
> 
> 
> 
> builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
>                  .filter((k, v) -> v != null)
>                  .map((k,v) -> KeyValue.pair(v.getUserId(), v))
>                  //.through("slidingbykey",
> Produced.with(Serdes.String(), inputSerde))
>                  .groupByKey()
> 
> .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
> windowDuration))
>                  .aggregate(OutputPojo::new, (k, tr, out) -> {
>                      out.setUserId(tr.getUserId());
>                      out.setCount(out.getCount() +1);
>                      out.setSum(out.getSum() + tr.getInt4());
>                      out.setUuid(tr.getUuid());
> 
> out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
>                      waitForMs(200); //added delay just for analysing output
>                      return out;
>                  }, Materialized.with(stringSerde, outputSerde))
>                  .suppress(Suppressed.untilTimeLimit(windowDuration,
> Suppressed.BufferConfig.unbounded()))
>                  .toStream()
>                  .map((Windowed<String> key, OutputPojo out) -> {
>                      return new KeyValue<>(key.key(),out) ;
>                  })
>                  .print(Printed.toSysOut());
> //                .to(aveTempOutputTopic, Produced.with(stringSerde,
> outputSerde))
>                  ;
> 
> 
> 
> ________________________________________________________________
> 
> 
> Input data :
> 
> for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done
>> {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
>> '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
>> {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
>> 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
>> {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
>> '48d2b4ea-052d-42fa-a998-0216d928c034'}
>> {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
>> '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
>> {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
>> 'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
>> {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
>> '135dc5cd-50cb-467b-9e63-300fdeedaf75'}
>> {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
>> '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
>> {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
>> 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
>> {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
>> '7baa4254-b9da-43dc-bbb7-4caede578aeb'}
>> {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
>> '16541989-f3ba-49f6-bd31-bf8a75ba8eac'}
> 
> 
> ________________________________________________________________
> 
> 
> Output (*Unexpected*) :  below output is captured at each sliding window of
> 1s duration   (but input data is published at 2s of interval) :
> 
> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:31:28.263,
>> uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)  ----> seems older UUID
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
>> strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
>> strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:05.029, uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:05.029, uuid=dbfd8cee-565d-496b-b5a8-773ae64bc518)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:07.227, uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:07.227, uuid=135dc5cd-50cb-467b-9e63-300fdeedaf75)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:09.425, uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:09.425, uuid=66d8e3c7-8f63-43ca-acf1-e39619bf33a0)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:11.623, uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:11.623, uuid=f037712b-42a5-4449-bcc2-cf6eafddf5ad)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:13.820, uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
>>
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
>> strTime=2022-05-17 15:32:13.820, uuid=7baa4254-b9da-43dc-bbb7-4caede578aeb)
>> [KSTREAM-MAP-0000000011]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
>> strTime=2022-05-17 15:32:16.018, uuid=16541989-f3ba-49f6-bd31-bf8a75ba8eac)
> 
> 
> Regards,
> Shankar
>