You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Murilo Tavares <mu...@gmail.com> on 2019/01/26 02:32:55 UTC

Bug in TopologyTestDriver

Hi
I am new to this mailing list, so not sure if this is the right place to
send this. Please let me know if it's not.

I believe I found a bug on the TopologyTestDriver.

I have a topology that aggregates on a KTable. This is a generic method I
created to build this topology on different topics I have.

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B>
table, Function<B, C> getKeyFunction,
        Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
    return table
            .groupBy((key, value) ->
KeyValue.pair(getKeyFunction.apply(value), value),
                    Serialized.with(keySerde, valueSerde))
            .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
                agg.remove(newValue);
                agg.add(newValue);
                return agg;
            }, (key, oldValue, agg) -> {
                agg.remove(oldValue);
                return agg;
            }, Materialized.with(keySerde, aggregatedSerde));
}

This works pretty well when using Kafka, but not when testing via
`TopologyTestDriver`.

In both scenarios, when I get an update, the subtractor is called first,
and then the adder is called. The problem is that when using the
TopologyTestDriver, two messages are sent out for updates: one after the
subtractor call, and another one after the adder call. Not to mention that
the message that is sent after the subrtractor and before the adder is in
an incorrect stage.

I created a test case in GitHub to illustrate the issue:
https://github.com/mulho/topology-testcase

Anyone could confirm this is a bug? I've tested this for both Kafka
versions 2.0.1 and 2.1.0.

Thanks

Murilo

Re: Bug in TopologyTestDriver

Posted by "Matthias J. Sax" <ma...@confluent.io>.
This question is cross posted on SO. I answered it there:

https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations



On 1/25/19 6:32 PM, Murilo Tavares wrote:
> Hi
> I am new to this mailing list, so not sure if this is the right place to
> send this. Please let me know if it's not.
> 
> I believe I found a bug on the TopologyTestDriver.
> 
> I have a topology that aggregates on a KTable. This is a generic method I
> created to build this topology on different topics I have.
> 
> public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B>
> table, Function<B, C> getKeyFunction,
>         Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
>     return table
>             .groupBy((key, value) ->
> KeyValue.pair(getKeyFunction.apply(value), value),
>                     Serialized.with(keySerde, valueSerde))
>             .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
>                 agg.remove(newValue);
>                 agg.add(newValue);
>                 return agg;
>             }, (key, oldValue, agg) -> {
>                 agg.remove(oldValue);
>                 return agg;
>             }, Materialized.with(keySerde, aggregatedSerde));
> }
> 
> This works pretty well when using Kafka, but not when testing via
> `TopologyTestDriver`.
> 
> In both scenarios, when I get an update, the subtractor is called first,
> and then the adder is called. The problem is that when using the
> TopologyTestDriver, two messages are sent out for updates: one after the
> subtractor call, and another one after the adder call. Not to mention that
> the message that is sent after the subrtractor and before the adder is in
> an incorrect stage.
> 
> I created a test case in GitHub to illustrate the issue:
> https://github.com/mulho/topology-testcase
> 
> Anyone could confirm this is a bug? I've tested this for both Kafka
> versions 2.0.1 and 2.1.0.
> 
> Thanks
> 
> Murilo
>