You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Guozhang Wang (Jira)" <ji...@apache.org> on 2021/10/01 02:35:00 UTC

[jira] [Commented] (KAFKA-13336) Migrate StreamsBuilder/Topology class to interfaces and move Topology parameter from KafkaStreams constructor to #start

    [ https://issues.apache.org/jira/browse/KAFKA-13336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17423072#comment-17423072 ] 

Guozhang Wang commented on KAFKA-13336:
---------------------------------------

[~ableegoldman] Thanks for the proposal! I think I agree with most of it.

Ironically we did have a {{TopologyBuilder}} as in PAPI alongside with the {{StreamsBuilder extend TopologyBuilder}} as in DSL, both of them would be generating a {{Topology}}, and the goal was that with {{StreamsBuilder}} users can use both DSL and PAPI to construct the {{Topology}}, but the underlying inner classes was a mess --- well, maybe not messier as we ended up today, to be fair :) --- and then we decided to change to the current model which totally separate DSL and PAPI, with {{StreamsBuilder}} generating a {{Topology}}.

Personally I still feel that the current separate works better, but I do agree that:

* A written topology should not be re-written inside the KafkaStreams constructor, or even later. The topology itself should only be optimized during the builder phase, and once it's out of the "build()" call it's final. This means that the configuration should be associated with a builder, and hence its topology. So there should be a 1:1:1 mapping from builder, to topology, to their corresponding configs.
* To support a 1:N mapping from the streams instance to multiple topologies, I think it's reasonable to only construct a "StreamsBuilder" from the "KafkaStreams" object instead of creating a "StreamsBuilder" from its own constructor. In that case you'd have a two-layer configuration system: 1) first create a "KafkaStreams" object with `streams = new KafkaStreams(config1)`, then 2) `streams#newTopologyBuilder(config2)` where config1 is global and config2 is per that builder/topology.

> Migrate StreamsBuilder/Topology class to interfaces and move Topology parameter from KafkaStreams constructor to #start
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13336
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13336
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: A. Sophie Blee-Goldman
>            Priority: Major
>              Labels: needs-kip
>
> In order to refactor and improve the streams physical plan generation, we'll need to clean up the DSL builder API a bit and in particular enforce the configs be passed in from the beginning, rather than only when calling #build. We can also use this opportunity to improve the disconnect between the builder, the resulting Topology, and the Kafka Streams application that ultimately runs this topology – at the moment these are all completely uncoupled on the surface, so it's easy to think that a StreamsBuilder can be reused to build multiple Topology objects, or that a Topology object could be passed in to different KafkaStreams. However there is internal state that is shared and modified during StreamsBuilder#build and in the KafkaStreams constructor, and they are actually very coupled under the hood meaning there must be a 1:1:1 ratio of builder to topology to KafkaStreams. So we need a new API that
>  # Forces users to pass in the configs (Properties) when constructing the builder
>  # Clarifies the relationship of the builder object to the topology, and to the app itself
> I think a good API for this might look something like this:
>  # Move the StreamsBuilder class to an internal one (technically we would need to keep it where it is for now until a full deprecation cycle)
>  # Introduce a TopologyBuilder interface to replace the functionality of the current StreamsBuilder class, and have StreamsBuilder implement this. All the current methods on StreamsBuilder will be moved to the TopologyBuilder interfaces
>  # Move the Topology parameter out of the KafkaStreams constructor, and into the KafkaStreams#start method, so you can construct a KafkaStreams object before the Topology
>  # Add a factory method on KafkaStreams for users to get instances of the TopologyBuilder, and have this accept a Properties. For example
> {code:java}
> class KafkaStreams {
>     public void newTopologyBuilder(final Properties props) {
>         // convert to StreamsConfig to validate configs & check for application.id
>         final StreamsConfig config = new StreamsConfig(props); 
>         return new StreamsBuilder(config);
>     }
> }{code}
> This should satisfy both of the requirements, and imo provides a cleaner API anyways. Getting the builder through a factory method on the KafkaStreams object should make it clear that this builder is tied to that particular KafkaStreams instance. And we can enforce that it isn't reused for a different application by parsing the Properties passed in to KafkaStreams#newTopologyBuilder, specifically the application.id. It also leads to a more natural process of writing a Kafka Streams app: start with the KafkaStreams object and global configs, then use this to build up the processing topology. Looking forward, this will better complement the new named topologies feature, with an API that treats topologies as entities attached to a particular KafkaStreams but that may come and go



--
This message was sent by Atlassian Jira
(v8.3.4#803005)