You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "A. Sophie Blee-Goldman (Jira)" <ji...@apache.org> on 2021/03/17 16:55:00 UTC

[jira] [Assigned] (KAFKA-12446) Define KGroupedTable#aggregate subtractor + adder order of execution

     [ https://issues.apache.org/jira/browse/KAFKA-12446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

A. Sophie Blee-Goldman reassigned KAFKA-12446:
----------------------------------------------

    Assignee: Ben Ellis

> Define KGroupedTable#aggregate subtractor + adder order of execution
> --------------------------------------------------------------------
>
>                 Key: KAFKA-12446
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12446
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Ben Ellis
>            Assignee: Ben Ellis
>            Priority: Minor
>
> Currently, when an update is processed by KGroupedTable#aggregate, the subtractor is called first, then the adder. But per the docs the order of execution is not defined (ie. could change in future releases).
> [https://kafka.apache.org/26/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating]
> {quote}When subsequent non-null values are received for a key (e.g., UPDATE), then (1) the subtractor is called with the old value as stored in the table and (2) the adder is called with the new value of the input record that was just received. The order of execution for the subtractor and adder is not defined.
> {quote}
> This ticket proposes making the current order of execution part of the public contract.
> That would allow Kafka Streams DSL users the freedom to use aggregates such as: 
> {code:java}
> aggregate(
>   HashMap::new,
>   (aggKey, newValue, aggValue) -> { aggValue.put(newValue.getKey(), newValue.getValue() }, // adder
>   (aggKey, oldValue, aggValue) -> { aggValue.remove(newValue.getKey() } // subtractor
> ){code}
> and handle updates where key remains the same but value changes.
> The Kafka Music Example at
> [https://github.com/confluentinc/kafka-streams-examples/blob/6.0.1-post/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java#L345]
> relies on the subtractor being called first.
>  
> See discussion at [https://github.com/confluentinc/kafka-streams-examples/issues/380]
> See also the more general point made at [https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined] 
> {quote}If the adder and subtractor are non-commutative operations and the order in which they are executed can vary, you can end up with different results depending on the order of execution of adder and subtractor. An example of a useful non-commutative operation would be something like if we’re aggregating records into a Set:{color:#172b4d} {color}
> {quote}
> {code:java}
> .aggregate[Set[Animal]](Set.empty)(
>  adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
>  subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
> )
> {code}
> {quote}In this example, for duplicated events, if the adder is called before the subtractor you would end up removing the value entirely from the set (which would be problematic for most use-cases I imagine).
> {quote}
> As [~mjsax] notes on [https://github.com/confluentinc/kafka-streams-examples/issues/380]
>  
> {quote}the implementation used the same order since 0.10.0 release and it was never changed
> {quote}
> so making this behavior part of the standard amounts to making official what has already been stable for a long time.
> Cost:
>  *  Limits your options for the future. If you ever needed Kafka Streams to change the order of execution (or make that order indeterminate instead of its current hard coded order), you would have to make that a breaking change.
> Benefit:
>  * Encourages wider use of the KGroupedTable#aggregate method (current lack of a defined order prevents using aggregate with non-commutative adder/subtractor functions)
>  * Simplifies reasoning about how to use KGroupedTable#aggregate (knowing that a given order can be relied upon makes the method itself easier to understand)
>  
>  
> ----
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)