You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Leontiy (Jira)" <ji...@apache.org> on 2022/07/17 13:47:00 UTC

[jira] [Commented] (KAFKA-10659) Cogroup topology generation fails if input streams are repartitioned

    [ https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17567646#comment-17567646 ] 

Leontiy commented on KAFKA-10659:
---------------------------------

We had a similar problem as blueedgenick. Adding '{{{}.flatMapValues(...){}}}'  as suggested didn't work. What helped as a workaround:
after selectKey() and before using cogroup(), push result to intermediate topic.
And get stream for cogroup() from that topic.

> Cogroup topology generation fails if input streams are repartitioned
> --------------------------------------------------------------------
>
>                 Key: KAFKA-10659
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10659
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0, 2.5.1
>            Reporter: blueedgenick
>            Priority: Major
>
> Example to reproduce:
>  
> {code:java}
> KGroupedStream<String, A> groupedA = builder
>   .stream(topicA, Consumed.with(Serdes.String(), serdeA))
>   .selectKey((aKey, aVal) -> aVal.someId)
>   .groupByKey();
> KGroupedStream<String, B> groupedB = builder
>   .stream(topicB, Consumed.with(Serdes.String(), serdeB))
>   .selectKey((bKey, bVal) -> bVal.someId)
>   .groupByKey();
> KGroupedStream<String, C> groupedC = builder
>   .stream(topicC, Consumed.with(Serdes.String(), serdeC))
>   .selectKey((cKey, cVal) -> cVal.someId)
>   .groupByKey();
> CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA)
>   .cogroup(groupedB, AggregatorB)
>  .  cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
>  KTable<String, ABC> agg = cogroup.aggregate(
>   () -> new ABC(),
>   Named.as("my-agg-proc-name"),
>   Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as(
>  "abc-agg-store") 
>  .withKeySerde(Serdes.String())
>  .withValueSerde(serdeABC)
>  );
> {code}
>  
>  
> This throws an exception during topology generation: 
>  
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Processor abc-agg-store-repartition-filter is already added. at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
>  at org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
>  at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
>  at ...
> {code}
>  
> The same exception is observed if the `selectKey(...).groupByKey()`  pattern is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state, explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if the grouping step is named by passing a `Grouped.with(...)` expression to either `groupByKey`` or `groupBy`.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)