You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Nikki Thean (JIRA)" <ji...@apache.org> on 2018/06/13 14:14:00 UTC

[jira] [Created] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent

Nikki Thean created KAFKA-7055:
----------------------------------

             Summary: Kafka Streams Processor API allows you to add sinks and processors without parent
                 Key: KAFKA-7055
                 URL: https://issues.apache.org/jira/browse/KAFKA-7055
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 1.0.0
            Reporter: Nikki Thean
            Assignee: Nikki Thean


The Kafka Streams Processor API allows you to define a Topology and connect sources, processors, and sinks. From reading through the code, it seems that you cannot forward a message to a downstream node unless it is explicitly connected to the upstream node (from which you are forwarding the message) as a child. ([example|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]] where you forward using name of downstream node rather than child index)

However, I've been able to connect processors and sinks to the topology without including parent names, i.e with empty vararg (using [this method|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]).]

As any attempt to forward a message to those nodes will throw a StreamsException, I suggest throwing an exception if a processor or sink is added without at least one upstream node. There is a method in `InternalTopologyBuilder` that allows you to connect processors by name after you add them to the topology, but it is not part of the external Processor API.

In addition (or alternatively), I suggest making [the error message for when users try to forward messages to a node that is not connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119] more descriptive, like [this one for when a user attempts to access a state store that is not connected to the processor|[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81].]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)