You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Murilo Tavares <mu...@gmail.com> on 2019/01/26 02:32:55 UTC
Bug in TopologyTestDriver
Hi
I am new to this mailing list, so not sure if this is the right place to
send this. Please let me know if it's not.
I believe I found a bug on the TopologyTestDriver.
I have a topology that aggregates on a KTable. This is a generic method I
created to build this topology on different topics I have.
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B>
table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) ->
KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
This works pretty well when using Kafka, but not when testing via
`TopologyTestDriver`.
In both scenarios, when I get an update, the subtractor is called first,
and then the adder is called. The problem is that when using the
TopologyTestDriver, two messages are sent out for updates: one after the
subtractor call, and another one after the adder call. Not to mention that
the message that is sent after the subrtractor and before the adder is in
an incorrect stage.
I created a test case in GitHub to illustrate the issue:
https://github.com/mulho/topology-testcase
Anyone could confirm this is a bug? I've tested this for both Kafka
versions 2.0.1 and 2.1.0.
Thanks
Murilo
Re: Bug in TopologyTestDriver
Posted by "Matthias J. Sax" <ma...@confluent.io>.
This question is cross posted on SO. I answered it there:
https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
On 1/25/19 6:32 PM, Murilo Tavares wrote:
> Hi
> I am new to this mailing list, so not sure if this is the right place to
> send this. Please let me know if it's not.
>
> I believe I found a bug on the TopologyTestDriver.
>
> I have a topology that aggregates on a KTable. This is a generic method I
> created to build this topology on different topics I have.
>
> public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B>
> table, Function<B, C> getKeyFunction,
> Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
> return table
> .groupBy((key, value) ->
> KeyValue.pair(getKeyFunction.apply(value), value),
> Serialized.with(keySerde, valueSerde))
> .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
> agg.remove(newValue);
> agg.add(newValue);
> return agg;
> }, (key, oldValue, agg) -> {
> agg.remove(oldValue);
> return agg;
> }, Materialized.with(keySerde, aggregatedSerde));
> }
>
> This works pretty well when using Kafka, but not when testing via
> `TopologyTestDriver`.
>
> In both scenarios, when I get an update, the subtractor is called first,
> and then the adder is called. The problem is that when using the
> TopologyTestDriver, two messages are sent out for updates: one after the
> subtractor call, and another one after the adder call. Not to mention that
> the message that is sent after the subrtractor and before the adder is in
> an incorrect stage.
>
> I created a test case in GitHub to illustrate the issue:
> https://github.com/mulho/topology-testcase
>
> Anyone could confirm this is a bug? I've tested this for both Kafka
> versions 2.0.1 and 2.1.0.
>
> Thanks
>
> Murilo
>