You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "John Roesler (Jira)" <ji...@apache.org> on 2020/03/02 15:23:00 UTC

[jira] [Commented] (KAFKA-4835) Allow users control over repartitioning

    [ https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17049315#comment-17049315 ] 

John Roesler commented on KAFKA-4835:
-------------------------------------

Hey [~lkokhreidze] ,

 

This ticket is listed in KIP-221, but it doesn't seem like the KIP ultimately wound up satisfying this particular request. Are you planning to also implement this ticket, or should we unlink it and unassign it in the hopes that someone else will pick it up?

 

Thanks,

-John

> Allow users control over repartitioning
> ---------------------------------------
>
>                 Key: KAFKA-4835
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4835
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Michal Borowiecki
>            Assignee: Levani Kokhreidze
>            Priority: Major
>              Labels: kip
>             Fix For: 2.6.0
>
>
> From https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
>  My use case is as follows (unrelated bits omitted for brevity):
> {code:java}
> 		KTable<String, Activity> loggedInCustomers = builder
> 			.stream("customerLogins")
> 			.groupBy((key, activity) -> 
> 				activity.getCustomerRef())
> 			.reduce((first,second) -> second, loginStore());
> 		
> 		builder
> 			.stream("balanceUpdates")
> 			.map((key, activity) -> new KeyValue<>(
> 				activity.getCustomerRef(),
> 				activity))
> 			.join(loggedInCustomers, (activity, session) -> ...
> 			.to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the repartitionRequired flag (since the key changes), and the aggregation/join that follows will create the repartitioned topic.
>  However, in our case I know that both input streams are already partitioned by the customerRef value, which I'm mapping into the key (because it's required by the join operation).
>  So there are 2 unnecessary intermediate topics created with their associated overhead, while the ultimate goal is simply to do a join on a value that we already use to partition the original streams anyway.
>  (Note, we don't have the option to re-implement the original input streams to make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge of the incoming streams) whether a repartition is mandatory on aggregation and join operations (overloaded version of the methods with the repartitionRequired flag exposed maybe?)
>  An alternative would be to allow users to perform a join on a value other than the key (a keyValueMapper parameter to join, like the one used for joins with global tables), but I expect that to be more involved and error-prone to use for people who don't understand the partitioning requirements well (whereas it's safe for global tables).
>  
> KIP: 221 [https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)