You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Levani Kokhreidze (JIRA)" <ji...@apache.org> on 2019/05/23 14:11:00 UTC

[jira] [Commented] (KAFKA-8413) Add possibility to do repartitioning on KStream

    [ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16846740#comment-16846740 ] 

Levani Kokhreidze commented on KAFKA-8413:
------------------------------------------

Happy to work on KIP if this feature makes sense.

> Add possibility to do repartitioning on KStream
> -----------------------------------------------
>
>                 Key: KAFKA-8413
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8413
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Levani Kokhreidze
>            Priority: Minor
>         Attachments: topology-1.png, topology-2.png
>
>
> Consider following code:
>  
> {code:java}
> final KStream<String, String> streamByProfileId = streamsBuilder
>    .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>    .selectKey((key, value) -> value);
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-1")
>    );
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-2")
>    );
> {code}
>  
> This code will generate following topology:
>  
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-0000000001
>  Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
>  --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
>  <-- KSTREAM-SOURCE-0000000000
>  Processor: KSTREAM-FILTER-0000000004 (stores: [])
>  --> KSTREAM-SINK-0000000003
>  <-- KSTREAM-KEY-SELECT-0000000001
>  Processor: KSTREAM-FILTER-0000000008 (stores: [])
>  --> KSTREAM-SINK-0000000007
>  <-- KSTREAM-KEY-SELECT-0000000001
>  Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-0000000004
>  Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-0000000008
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-0000000002
>  Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-0000000005
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-0000000006
>  Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-0000000009
>  
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. In this example, two repartition topics are not really necessary and processing can be done with one sub-topology.
> Kafka Streams user, in DSL, may specify repartition topic manually using *KStream#through* method:
>  
> {code:java}
> final KStream<Object, Object> streamByProfileId = streamsBuilder
>    .stream("input-topic")
>    .selectKey((key, value) -> value)
>    .through("repartition-topic");
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-1")
>    );
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-2")
>    );
> {code}
>  
>  
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-SINK-0000000002
> <-- KSTREAM-SOURCE-0000000000
> Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-0000000001
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005
> Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> {code}
>  
>  
> While this gives possibility to optimizes Kafka Streams application, user still has to manually create repartition topic with correct number of partitions based on input topic. It would be great if in DSL we could have something like *repartition()* operation on *KStream* which can generate repartition topic based on user command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)