You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Leontiy (Jira)" <ji...@apache.org> on 2022/07/17 13:47:00 UTC
[jira] [Commented] (KAFKA-10659) Cogroup topology generation fails if input streams are repartitioned
[ https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567646#comment-17567646 ]
Leontiy commented on KAFKA-10659:
---------------------------------
We had a similar problem as blueedgenick. Adding '{{{}.flatMapValues(...){}}}' as suggested didn't work. What helped as a workaround:
after selectKey() and before using cogroup(), push result to intermediate topic.
And get stream for cogroup() from that topic.
> Cogroup topology generation fails if input streams are repartitioned
> --------------------------------------------------------------------
>
> Key: KAFKA-10659
> URL: https://issues.apache.org/jira/browse/KAFKA-10659
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.6.0, 2.5.1
> Reporter: blueedgenick
> Priority: Major
>
> Example to reproduce:
>
> {code:java}
> KGroupedStream<String, A> groupedA = builder
> .stream(topicA, Consumed.with(Serdes.String(), serdeA))
> .selectKey((aKey, aVal) -> aVal.someId)
> .groupByKey();
> KGroupedStream<String, B> groupedB = builder
> .stream(topicB, Consumed.with(Serdes.String(), serdeB))
> .selectKey((bKey, bVal) -> bVal.someId)
> .groupByKey();
> KGroupedStream<String, C> groupedC = builder
> .stream(topicC, Consumed.with(Serdes.String(), serdeC))
> .selectKey((cKey, cVal) -> cVal.someId)
> .groupByKey();
> CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA)
> .cogroup(groupedB, AggregatorB)
> . cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
> KTable<String, ABC> agg = cogroup.aggregate(
> () -> new ABC(),
> Named.as("my-agg-proc-name"),
> Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as(
> "abc-agg-store")
> .withKeySerde(Serdes.String())
> .withValueSerde(serdeABC)
> );
> {code}
>
>
> This throws an exception during topology generation:
>
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Processor abc-agg-store-repartition-filter is already added. at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
> at org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
> at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
> at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
> at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
> at ...
> {code}
>
> The same exception is observed if the `selectKey(...).groupByKey()` pattern is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state, explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if the grouping step is named by passing a `Grouped.with(...)` expression to either `groupByKey`` or `groupBy`.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)