You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Tomasz Kaszuba (Jira)" <ji...@apache.org> on 2022/06/07 12:51:00 UTC

[jira] [Created] (KAFKA-13963) Topology Description ignores context.forward

Tomasz Kaszuba created KAFKA-13963:
--------------------------------------

             Summary: Topology Description ignores context.forward
                 Key: KAFKA-13963
                 URL: https://issues.apache.org/jira/browse/KAFKA-13963
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 2.7.2
            Reporter: Tomasz Kaszuba


I have a simple topology:
{code:java}
      val topology = new Topology
      topology
        .addSource("source", Serdes.stringSerde.deserializer, Serdes.stringSerde.deserializer, inputTopic)
        .addProcessor(
          "process",
          new ProcessorSupplier[String, String] {
            override def get(): Processor[String, String] =
              new RecordCollectorProcessor()
          },
          "source"
        ) {code}
And a simple processor that uses context.forward to forward messages:
{code:java}
  private class ContextForwardProcessor extends AbstractProcessor[String, String]() {    override def process(key: String, value: String): Unit =
      context().forward("key", "value", To.child("output"))    override def close(): Unit = ()
  }  {code}
when I call topology.describe() I receive this:
{noformat}
Topologies:
   Sub-topology: 0
    Source: source (topics: [input])
      --> process
    Processor: process (stores: [])
      --> none
      <-- source {noformat}
Ignoring the fact that this will not run since it will throw a runtime exception why is the To.child ignored?

Taking it one point further if I add multiple sinks to the topology like so:
{code:java}
val topology = new Topology
      topology
        .addSource("source", Serdes.stringSerde.deserializer, Serdes.stringSerde.deserializer, inputTopic)
        .addProcessor(
          "process",
          new ProcessorSupplier[String, String] {
            override def get(): Processor[String, String] =
              new ContextForwardProcessor()
          },
          "source"
        )
        .addSink("sink", "output1", Serdes.stringSerde.serializer(), Serdes.stringSerde.serializer(), "process")
        .addSink("sink2", "output2", Serdes.stringSerde.serializer(), Serdes.stringSerde.serializer(), "process")  {code}
but have the processor only output to "output1" it is in no way reflected in the described topology graph.

I assume this is by design since it's a lot more work to interpret what the context.forward is doing but when I tried to look for this information in the java doc I couldn't find it.

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)