You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Christian Thoudahl (Jira)" <ji...@apache.org> on 2022/01/31 07:52:00 UTC

[jira] [Comment Edited] (KAFKA-10659) Cogroup topology generation fails if input streams are repartitioned

    [ https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17484552#comment-17484552 ] 

Christian Thoudahl edited comment on KAFKA-10659 at 1/31/22, 7:51 AM:
----------------------------------------------------------------------

We had same problem in the following code, observe that the 'Grouped.with(...)' described as workaround did not work for us:

 
{code:java}
inputStream()
    .transformValues(loggingProcessorSupplier, Named.as("source-logging"))
    .filterNot(mappableEventFilter, Named.as("filter_unmappable_events"))
    .groupByKey(
        Grouped.with("input-by-key",
            stringSerde,thickEventSerde))
    .aggregate(
        () -> null,
        aggregation,
        Named.as("aggregation"),
        store() // Disable caching so we output both remove events and tombstones
    )
    .toStream(Named.as("output-stream-events"))
    .to(topics.getOutputTopic(),
        produced.withName("sink"));

Topology build = streamsBuilder.build();
build.connectProcessorAndStateStores(REMOVAL_TRANSFORMATION, stores.getStore()); {code}
^(business meanings in steps and variables removed not to reveal business intent)^

 

My good colleauge Adam Sienkiewicz-Kozyrski and I worked to find out what happend. We didn't solve the problem, but Adam came up with a different workaround that we didn't find elsewhere.

 

Simply adding a '{{{}.flatMapValues(...){}}}' just between transform and groupByKey helped us. Following we added a '{{{}.mapValues(event -> event){}}}' and that also worked.

 

Don't know if this is closely or loosely related, but we did came across this ticket in our attempt to figure out what happened. Hope this can help others.


was (Author: JIRAUSER282487):
We had same problem in the following code, observe that the 'Grouped.with(...)' described as workaround did not work for us:

 
{code:java}
inputStream()
    .transformValues(loggingProcessorSupplier, Named.as("source-logging"))
    .filterNot(mappableEventFilter, Named.as("filter_unmappable_events"))
    .transform(cleanupTransoformation, Named.as(REMOVAL_TRANSFORMATION))
    .groupByKey(
        Grouped.with("input-by-key",
            stringSerde,thickEventSerde))
    .aggregate(
        () -> null,
        aggregation,
        Named.as("aggregation"),
        store() // Disable caching so we output both remove events and tombstones
    )
    .toStream(Named.as("output-stream-events"))
    .to(topics.getOutputTopic(),
        produced.withName("sink"));

Topology build = streamsBuilder.build();
build.connectProcessorAndStateStores(REMOVAL_TRANSFORMATION, stores.getStore()); {code}
^(business meanings in steps and variables removed not to reveal business intent)^

 

My good colleauge Adam Sienkiewicz-Kozyrski and I worked to find out what happend. We didn't solve the problem, but Adam came up with a different workaround that we didn't find elsewhere.

 

Simply adding a '{{{}.flatMapValues(...){}}}' just between transform and groupByKey helped us.

 

Don't know if this is closely or loosely related, but we did came across this ticket in our attempt to figure out what happened. Hope this can help others.

> Cogroup topology generation fails if input streams are repartitioned
> --------------------------------------------------------------------
>
>                 Key: KAFKA-10659
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10659
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0, 2.5.1
>            Reporter: blueedgenick
>            Priority: Major
>
> Example to reproduce:
>  
> {code:java}
> KGroupedStream<String, A> groupedA = builder
>   .stream(topicA, Consumed.with(Serdes.String(), serdeA))
>   .selectKey((aKey, aVal) -> aVal.someId)
>   .groupByKey();
> KGroupedStream<String, B> groupedB = builder
>   .stream(topicB, Consumed.with(Serdes.String(), serdeB))
>   .selectKey((bKey, bVal) -> bVal.someId)
>   .groupByKey();
> KGroupedStream<String, C> groupedC = builder
>   .stream(topicC, Consumed.with(Serdes.String(), serdeC))
>   .selectKey((cKey, cVal) -> cVal.someId)
>   .groupByKey();
> CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA)
>   .cogroup(groupedB, AggregatorB)
>  .  cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
>  KTable<String, ABC> agg = cogroup.aggregate(
>   () -> new ABC(),
>   Named.as("my-agg-proc-name"),
>   Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as(
>  "abc-agg-store") 
>  .withKeySerde(Serdes.String())
>  .withValueSerde(serdeABC)
>  );
> {code}
>  
>  
> This throws an exception during topology generation: 
>  
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: Processor abc-agg-store-repartition-filter is already added. at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
>  at org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
>  at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
>  at ...
> {code}
>  
> The same exception is observed if the `selectKey(...).groupByKey()`  pattern is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state, explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if the grouping step is named by passing a `Grouped.with(...)` expression to either `groupByKey`` or `groupBy`.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)