You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Levani Kokhreidze (Jira)" <ji...@apache.org> on 2020/02/19 21:34:00 UTC

[jira] [Updated] (KAFKA-4835) Allow users control over repartitioning

     [ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Levani Kokhreidze updated KAFKA-4835:
-------------------------------------
    Fix Version/s: 2.6.0

> Allow users control over repartitioning
> ---------------------------------------
>
>                 Key: KAFKA-4835
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4835
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Michal Borowiecki
>            Assignee: Levani Kokhreidze
>            Priority: Major
>              Labels: kip
>             Fix For: 2.6.0
>
>
> From https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
>  My use case is as follows (unrelated bits omitted for brevity):
> {code:java}
> 		KTable<String, Activity> loggedInCustomers = builder
> 			.stream("customerLogins")
> 			.groupBy((key, activity) -> 
> 				activity.getCustomerRef())
> 			.reduce((first,second) -> second, loginStore());
> 		
> 		builder
> 			.stream("balanceUpdates")
> 			.map((key, activity) -> new KeyValue<>(
> 				activity.getCustomerRef(),
> 				activity))
> 			.join(loggedInCustomers, (activity, session) -> ...
> 			.to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the repartitionRequired flag (since the key changes), and the aggregation/join that follows will create the repartitioned topic.
>  However, in our case I know that both input streams are already partitioned by the customerRef value, which I'm mapping into the key (because it's required by the join operation).
>  So there are 2 unnecessary intermediate topics created with their associated overhead, while the ultimate goal is simply to do a join on a value that we already use to partition the original streams anyway.
>  (Note, we don't have the option to re-implement the original input streams to make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge of the incoming streams) whether a repartition is mandatory on aggregation and join operations (overloaded version of the methods with the repartitionRequired flag exposed maybe?)
>  An alternative would be to allow users to perform a join on a value other than the key (a keyValueMapper parameter to join, like the one used for joins with global tables), but I expect that to be more involved and error-prone to use for people who don't understand the partitioning requirements well (whereas it's safe for global tables).
>  
> KIP: 221 [https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)