You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Matthias J. Sax (Jira)" <ji...@apache.org> on 2022/06/07 20:09:00 UTC

[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward

    [ https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551271#comment-17551271 ] 

Matthias J. Sax commented on KAFKA-13963:
-----------------------------------------

TopologyDescription only describes the structure of you graph of operators. In your first example, you only added two nodes to the graph ("source" and "process") and there is no node "output", and thus it's not contained in the `TopologyDescription`.

It's not really possible to take the business logic (ie, what `forward()` is doing) into account – at least I have not idea how this could be done with reasonable effort.

It's for sure not a bug. We should either close this ticket and change it into a feature request.

> Topology Description ignores context.forward
> --------------------------------------------
>
>                 Key: KAFKA-13963
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13963
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.2
>            Reporter: Tomasz Kaszuba
>            Priority: Minor
>
> I have a simple topology:
> {code:java}
>       val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new RecordCollectorProcessor()
>           },
>           "source"
>         ) {code}
> And a simple processor that uses context.forward to forward messages:
> {code:java}
>   private class ContextForwardProcessor extends AbstractProcessor[String, String]() {    override def process(key: String, value: String): Unit =
>       context().forward("key", "value", To.child("output"))    override def close(): Unit = ()
>   }  {code}
> when I call topology.describe() I receive this:
> {noformat}
> Topologies:
>    Sub-topology: 0
>     Source: source (topics: [input])
>       --> process
>     Processor: process (stores: [])
>       --> none
>       <-- source {noformat}
> Ignoring the fact that this will not run since it will throw a runtime exception why is the To.child ignored?
> Taking it one point further if I add multiple sinks to the topology like so:
> {code:java}
> val topology = new Topology
>       topology
>         .addSource("source", Serdes.stringSerde.deserializer, Serdes.stringSerde.deserializer, inputTopic)
>         .addProcessor(
>           "process",
>           new ProcessorSupplier[String, String] {
>             override def get(): Processor[String, String] =
>               new ContextForwardProcessor()
>           },
>           "source"
>         )
>         .addSink("sink", "output1", Serdes.stringSerde.serializer(), Serdes.stringSerde.serializer(), "process")
>         .addSink("sink2", "output2", Serdes.stringSerde.serializer(), Serdes.stringSerde.serializer(), "process")  {code}
> but have the processor only output to "output1" it is in no way reflected in the described topology graph.
> I assume this is by design since it's a lot more work to interpret what the context.forward is doing but when I tried to look for this information in the java doc I couldn't find it.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)