You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Vincenzo D'Amore <v....@gmail.com> on 2019/01/25 17:31:25 UTC

Kafka streams messages duplicates with non-overlapping gap-less windows

Hi all,

I write here because it's a couple of days I'm struggling trying to
understand why I've so much duplicates during the messages processing with
kafka streams.

I've a input topic where I'm 100% sure there are no duplicate keys or
messages,

During the process I've to aggregate the messages using
groupByKey, windowedBy and aggregate:

                .map((v1, v2) -> {
                    Long currentSecond = System.currentTimeMillis() / 500;
                    return new KeyValue<>(currentSecond.toString(), v2);
                })
                .groupByKey(Serialized.with(Serdes.String(), new
JsonSerde()))
                .windowedBy(TimeWindows.of(500))
                .aggregate(() -> new ArrayList<StreamEntry<String,
JsonNode>>(),
                        (aggKey, newValue, aggValue) -> {
                            final StreamEntry<String, JsonNode>
kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
                            aggValue.add(kvSimpleEntry);
                            return aggValue;
                        }, Materialized.with(Serdes.String(), new
ListKVJsonNodeSerde()))

Even during this process I'm 100% sure there are no duplicates, but
surprisingly after this I see that mapValues can be called with the same
messages more  than once. Even hundred of times.

               .mapValues(vv -> {
                   // here the list vv contains the many
                   ....
               })

Looking around I've found this project that seems to reproduce the problem:
https://github.com/westec/ks-aggregate-debug

Given that I am using non-overlapping gap-less windows in kstream, the
correct output should NOT contain duplicate messages between windows?
Any ideas why the duplicates?


-- 
Vincenzo D'Amore

Re: Kafka streams messages duplicates with non-overlapping gap-less windows

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Your observation is correct and it's expected behavior.

As mentioned originally, Kafka Streams follows a continuous update
processing model, ie, each time an input record is processed, the
aggregation result is updated and emitted downstream. (Did you read the
blog post?)

Thus, if you aggregate 5 records into an array, the `KTable#toStream()`
operation returns:

input: <k,a>, <k,b>, <k,c>, <k,d>, <k,e>
output: <k,[a]>, <k,[a,b]>, <k,[a,b,c]>, <k,[a,b,c,d]>, <k,[a,b,c,d,e]>

You might not see all updates due to caching:
https://kafka.apache.org/21/documentation/streams/developer-guide/memory-mgmt.html

Since 2.1, Kafka Streams added a new `suppress()` operator that you can
use to get only one result:
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results


-Matthias


On 1/27/19 3:40 PM, Vincenzo D'Amore wrote:
> Hi Matthias, thanks for your reply. Let me to explain better what I'm
> trying to say, in the meantime I've played with this problem and I think
> now I have a more clear view, though I haven't still a solution.
> 
> I've an input topic A which is a stream of message where each message
> contains just an ID. Those IDs (messages) can be thousands of even millions
> but in my test (proof of concept) are all different.
> 
> In order to process them, for each one I have to retrive few data that are
> stored in a nosql database, as you can understand querying one ID each time
> is not a good solution, I mean for performance reason, so I need to
> aggregate them and here comes the problem.
> 
> So from the source topic A I have created a new topic B where for each
> message now has a key which is a number that change X milliseconds (say
> 500ms).
> Now I can have a group by key and an aggregate. I suppose that each list
> returned by aggregate() does not contains duplicates.
> The output of this aggregate process is saved in the topic C.
> Topic C contains arrays of IDs of different size and the key is the number
> created to group them.
> 
> And here I have my big surprise, in the topic C there are a lot of ID that
> are present at the same time in different messages.
> Those messages have the same key but arrays of ID with different size, and
> each array partially contains ID present in other messages.
> 
> I suppose this should be impossible.
> 
> So, for example, if I have a stream with the following list of messages:
> 
> key - value
> --------------
> 0 - 1
> 0 - 2
> 0 - 3
> 0 - 4
> 0 - 5
> 1 - 6
> 1 - 7
> 1 - 8
> 1 - 9
> 1 - 10
> 
> I suppose the groupByKey() and aggregate() should return
> 
> key - value
> ----------------
> 0 - [1,2,3,4,5]
> 1 - [6,7,8,9,10]
> 
> But instead I found something like:
> 
> key - value
> ----------------
> 0 - [1,2,3,4,5]
> 0 - [2,3,4,5]
> 1 - [6,7,8,9]
> 1 - [6,7,8,9,10]
> 
> So the question is, did I do something wrong trying to aggregate them? how
> can avoid those duplicates?
> 
> 
> On Sat, Jan 26, 2019 at 9:01 PM Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> I am not 100% sure, what you mean by
>>
>>>> I've a input topic where I'm 100% sure there are no duplicate keys or
>> messaged
>>
>> If this is the case (ie, each key is unique), it would imply that each
>> window contains exactly one record per key. Hence, why do you aggregate?
>> Each aggregate would consist of only one message making an aggregation
>> step unnecessary.
>>
>> Can you be a little bit more specific and provide a sample input
>> (key,value,timestamp), observed output, and expected output?
>>
>> I suspect (but I am not sure), that you might "struggle" with Kafka
>> Streams' continuous output model. Maybe this blog post sheds some light:
>> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
>>
>>
>> -Matthias
>>
>> On 1/25/19 9:31 AM, Vincenzo D'Amore wrote:
>>> Hi all,
>>>
>>> I write here because it's a couple of days I'm struggling trying to
>>> understand why I've so much duplicates during the messages processing
>> with
>>> kafka streams.
>>>
>>> I've a input topic where I'm 100% sure there are no duplicate keys or
>>> messages,
>>>
>>> During the process I've to aggregate the messages using
>>> groupByKey, windowedBy and aggregate:
>>>
>>>                 .map((v1, v2) -> {
>>>                     Long currentSecond = System.currentTimeMillis() /
>> 500;
>>>                     return new KeyValue<>(currentSecond.toString(), v2);
>>>                 })
>>>                 .groupByKey(Serialized.with(Serdes.String(), new
>>> JsonSerde()))
>>>                 .windowedBy(TimeWindows.of(500))
>>>                 .aggregate(() -> new ArrayList<StreamEntry<String,
>>> JsonNode>>(),
>>>                         (aggKey, newValue, aggValue) -> {
>>>                             final StreamEntry<String, JsonNode>
>>> kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
>>>                             aggValue.add(kvSimpleEntry);
>>>                             return aggValue;
>>>                         }, Materialized.with(Serdes.String(), new
>>> ListKVJsonNodeSerde()))
>>>
>>> Even during this process I'm 100% sure there are no duplicates, but
>>> surprisingly after this I see that mapValues can be called with the same
>>> messages more  than once. Even hundred of times.
>>>
>>>                .mapValues(vv -> {
>>>                    // here the list vv contains the many
>>>                    ....
>>>                })
>>>
>>> Looking around I've found this project that seems to reproduce the
>> problem:
>>> https://github.com/westec/ks-aggregate-debug
>>>
>>> Given that I am using non-overlapping gap-less windows in kstream, the
>>> correct output should NOT contain duplicate messages between windows?
>>> Any ideas why the duplicates?
>>>
>>>
>>
>>
> 


Re: Kafka streams messages duplicates with non-overlapping gap-less windows

Posted by Vincenzo D'Amore <v....@gmail.com>.
Hi Matthias, thanks for your reply. Let me to explain better what I'm
trying to say, in the meantime I've played with this problem and I think
now I have a more clear view, though I haven't still a solution.

I've an input topic A which is a stream of message where each message
contains just an ID. Those IDs (messages) can be thousands of even millions
but in my test (proof of concept) are all different.

In order to process them, for each one I have to retrive few data that are
stored in a nosql database, as you can understand querying one ID each time
is not a good solution, I mean for performance reason, so I need to
aggregate them and here comes the problem.

So from the source topic A I have created a new topic B where for each
message now has a key which is a number that change X milliseconds (say
500ms).
Now I can have a group by key and an aggregate. I suppose that each list
returned by aggregate() does not contains duplicates.
The output of this aggregate process is saved in the topic C.
Topic C contains arrays of IDs of different size and the key is the number
created to group them.

And here I have my big surprise, in the topic C there are a lot of ID that
are present at the same time in different messages.
Those messages have the same key but arrays of ID with different size, and
each array partially contains ID present in other messages.

I suppose this should be impossible.

So, for example, if I have a stream with the following list of messages:

key - value
--------------
0 - 1
0 - 2
0 - 3
0 - 4
0 - 5
1 - 6
1 - 7
1 - 8
1 - 9
1 - 10

I suppose the groupByKey() and aggregate() should return

key - value
----------------
0 - [1,2,3,4,5]
1 - [6,7,8,9,10]

But instead I found something like:

key - value
----------------
0 - [1,2,3,4,5]
0 - [2,3,4,5]
1 - [6,7,8,9]
1 - [6,7,8,9,10]

So the question is, did I do something wrong trying to aggregate them? how
can avoid those duplicates?


On Sat, Jan 26, 2019 at 9:01 PM Matthias J. Sax <ma...@confluent.io>
wrote:

> I am not 100% sure, what you mean by
>
> >> I've a input topic where I'm 100% sure there are no duplicate keys or
> messaged
>
> If this is the case (ie, each key is unique), it would imply that each
> window contains exactly one record per key. Hence, why do you aggregate?
> Each aggregate would consist of only one message making an aggregation
> step unnecessary.
>
> Can you be a little bit more specific and provide a sample input
> (key,value,timestamp), observed output, and expected output?
>
> I suspect (but I am not sure), that you might "struggle" with Kafka
> Streams' continuous output model. Maybe this blog post sheds some light:
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
>
>
> -Matthias
>
> On 1/25/19 9:31 AM, Vincenzo D'Amore wrote:
> > Hi all,
> >
> > I write here because it's a couple of days I'm struggling trying to
> > understand why I've so much duplicates during the messages processing
> with
> > kafka streams.
> >
> > I've a input topic where I'm 100% sure there are no duplicate keys or
> > messages,
> >
> > During the process I've to aggregate the messages using
> > groupByKey, windowedBy and aggregate:
> >
> >                 .map((v1, v2) -> {
> >                     Long currentSecond = System.currentTimeMillis() /
> 500;
> >                     return new KeyValue<>(currentSecond.toString(), v2);
> >                 })
> >                 .groupByKey(Serialized.with(Serdes.String(), new
> > JsonSerde()))
> >                 .windowedBy(TimeWindows.of(500))
> >                 .aggregate(() -> new ArrayList<StreamEntry<String,
> > JsonNode>>(),
> >                         (aggKey, newValue, aggValue) -> {
> >                             final StreamEntry<String, JsonNode>
> > kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
> >                             aggValue.add(kvSimpleEntry);
> >                             return aggValue;
> >                         }, Materialized.with(Serdes.String(), new
> > ListKVJsonNodeSerde()))
> >
> > Even during this process I'm 100% sure there are no duplicates, but
> > surprisingly after this I see that mapValues can be called with the same
> > messages more  than once. Even hundred of times.
> >
> >                .mapValues(vv -> {
> >                    // here the list vv contains the many
> >                    ....
> >                })
> >
> > Looking around I've found this project that seems to reproduce the
> problem:
> > https://github.com/westec/ks-aggregate-debug
> >
> > Given that I am using non-overlapping gap-less windows in kstream, the
> > correct output should NOT contain duplicate messages between windows?
> > Any ideas why the duplicates?
> >
> >
>
>

-- 
Vincenzo D'Amore

Re: Kafka streams messages duplicates with non-overlapping gap-less windows

Posted by "Matthias J. Sax" <ma...@confluent.io>.
I am not 100% sure, what you mean by

>> I've a input topic where I'm 100% sure there are no duplicate keys or messaged

If this is the case (ie, each key is unique), it would imply that each
window contains exactly one record per key. Hence, why do you aggregate?
Each aggregate would consist of only one message making an aggregation
step unnecessary.

Can you be a little bit more specific and provide a sample input
(key,value,timestamp), observed output, and expected output?

I suspect (but I am not sure), that you might "struggle" with Kafka
Streams' continuous output model. Maybe this blog post sheds some light:
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/


-Matthias

On 1/25/19 9:31 AM, Vincenzo D'Amore wrote:
> Hi all,
> 
> I write here because it's a couple of days I'm struggling trying to
> understand why I've so much duplicates during the messages processing with
> kafka streams.
> 
> I've a input topic where I'm 100% sure there are no duplicate keys or
> messages,
> 
> During the process I've to aggregate the messages using
> groupByKey, windowedBy and aggregate:
> 
>                 .map((v1, v2) -> {
>                     Long currentSecond = System.currentTimeMillis() / 500;
>                     return new KeyValue<>(currentSecond.toString(), v2);
>                 })
>                 .groupByKey(Serialized.with(Serdes.String(), new
> JsonSerde()))
>                 .windowedBy(TimeWindows.of(500))
>                 .aggregate(() -> new ArrayList<StreamEntry<String,
> JsonNode>>(),
>                         (aggKey, newValue, aggValue) -> {
>                             final StreamEntry<String, JsonNode>
> kvSimpleEntry = new StreamEntry<>(aggKey, newValue);
>                             aggValue.add(kvSimpleEntry);
>                             return aggValue;
>                         }, Materialized.with(Serdes.String(), new
> ListKVJsonNodeSerde()))
> 
> Even during this process I'm 100% sure there are no duplicates, but
> surprisingly after this I see that mapValues can be called with the same
> messages more  than once. Even hundred of times.
> 
>                .mapValues(vv -> {
>                    // here the list vv contains the many
>                    ....
>                })
> 
> Looking around I've found this project that seems to reproduce the problem:
> https://github.com/westec/ks-aggregate-debug
> 
> Given that I am using non-overlapping gap-less windows in kstream, the
> correct output should NOT contain duplicate messages between windows?
> Any ideas why the duplicates?
> 
>