You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Fq Public <fq...@gmail.com> on 2021/01/29 00:50:46 UTC

Clarify “the order of execution for the subtractor and adder is not defined”

The Streams DSL documentation
<https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
includes
a caveat about using the aggregate method to transform a KGroupedTable →
KTable, as follows (emphasis mine):

When subsequent non-null values are received for a key (e.g., UPDATE), then
(1) the subtractor is called with the old value as stored in the table and
(2) the adder is called with the new value of the input record that was
just received. *The order of execution for the subtractor and adder is not
defined.*

My interpretation of that last line implies that one of three things can
happen:

   1. subtractor can be called before adder
   2. adder can be called before subtractor
   3. adder and subtractor could be called at the same time

Here is the question I'm looking to get answered:
*Are all 3 scenarios above actually possible when using the aggregate
method on a KGroupedTable?*
Or am I misinterpreting the documentation? For my use-case (detailed
below), it would be ideal if the subtractor was always called before the
adder.
------------------------------

*Why is this question important?*

If the adder and subtractor are non-commutative operations and the order in
which they are executed can vary, you can end up with different results
depending on the order of execution of adder and subtractor. An example of
a useful non-commutative operation would be something like if we’re
aggregating records into a Set:

.aggregate[Set[Animal]](Set.empty)(
  adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
  subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
)

In this example, for duplicated events, if the adder is called before the
subtractor you would end up removing the value entirely from the set (which
would be problematic for most use-cases I imagine).
------------------------------

*Why am I doubting the documentation (assuming my interpretation of it is
correct)?*

   1. Seems like an unusual design choice
   2. When I've run unit tests (using TopologyTestDriver and
   EmbeddedKafka), I always see the subtractor is called before the adder.
   Unfortunately, if there is some kind of race condition involved, it's
   entirely possible that I would never hit the other scenarios.
   3. I did try looking into the kafka-streams codebase as well. The
   KTableProcessorSupplier that calls the user-supplied adder/subtracter
   functions appears to be this one:
   https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
and
   on line 92, you can even see a comment saying "first try to remove the old
   value". Unfortunately, during my own testing, I saw was that the
process function
   itself is called twice; first with a Change<V> value that includes only
   the old value and then the process function is called again with a Change<V>
   value that includes only the new value. I haven't been able to dig deep
   enough to find the internal code that is generating the old value record
   and the new value record (upon receiving an update) to determine if it
   actually produces those records in that order.

Re: Clarify “the order of execution for the subtractor and adder is not defined”

Posted by Fq Public <fq...@gmail.com>.
Ahh okay I see (re: producer config), I didn't think about that yet. I'm
using StreamsConfig.EXACTLY_ONCE processing for my Kafka Streams app so I
should be protected against re-ordering of messages in case of retries
(since `enable.idempotent` is automatically set to true in
StreamsConfig.EXACTLY_ONCE setting).

Thank you Matthias, this has been very helpful!

Cheers,
Fq

On Tue, 2 Feb 2021 at 02:45, Matthias J. Sax <mj...@apache.org> wrote:

> About the producer config:
>
> By default, the producer will re-try to send record batches in case a
> send failed. Furthermore, the producer default configs allows for 5
> parallel in-flight request per TCP connect. Thus, if one request fails
> while other succeed, and the producer retries the send request, the
> retired request would effectively get re-ordered.
>
> To avoid re-ordering, you can either set config `max.in.flight.request =
> 1`, or you enable idempotent writes. Ie, the default config does _not_
> guard against reordering in case of retries, but is optimized for
> increased throughput.
>
> >> I take it this means that, if I were joining a KStream with this
> KTable, it
> >> is entirely possible that a KStream record could join with the KTable
> in a
> >> state where the subtractor has been already executed but the adder has
> not
> >> yet been executed for the corresponding KTable-update event?
>
> Yes, that would be possible (even without interleaved records, as we
> always send two records). We could maybe close this gap, but the fix
> would depend on a proper implementation of .equals() method for the key
> type. Feel free to create a Jira ticket for this case.
>
> We have work in progress the improve DSL operator semantics. In the
> meantime, if the current implementation of the DSL operators are an
> issue, you could fall back to the PAPI and implement custom operators
> that address the corner cases that are important for your use case.
>
>
> -Matthias
>
> On 2/1/21 10:39 AM, Fq Public wrote:
> > The other follow-up question I have is related to this part of your
> > statement:
> > ```
> > also, even if the key did not change, both records might be interleaved
> by
> > other records...
> > ```
> > I take it this means that, if I were joining a KStream with this KTable,
> it
> > is entirely possible that a KStream record could join with the KTable in
> a
> > state where the subtractor has been already executed but the adder has
> not
> > yet been executed for the corresponding KTable-update event?
> >
> > Cheers,
> > FQ
> >
> > On Mon, 1 Feb 2021 at 11:53, Fq Public <fq...@gmail.com> wrote:
> >
> >> Hiya Matthias, Alexandre,
> >>
> >> Thanks for your detailed responses. Your explanation about why the order
> >> of execution in the `KGroupedTable.aggregate` method does not matter (so
> >> much as what's happening in the `KTable.groupBy` method) makes sense to
> me.
> >>
> >> I have one follow-up question regarding this part of your statement:
> >>
> >> The only guarantee we can provide is (given that you configured the
> >> producer correctly to avoid re-ordring during send()), that if the
> >> grouping key does not change, the send of the old and new value will not
> >> be re-ordered relative to each other. The order of the send is
> hard-coded in the upstream processor though:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
> >>
> >> What would be an example of an incorrect configuration I could apply
> that
> >> would result in re-ordering during the `send()` call?
> >> As you said, "the order of the send is hard-coded in the upstream
> >> processor" so I'm struggling to come up with an example here.
> >>
> >> Cheers,
> >> FQ
> >>
> >> On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <mj...@apache.org> wrote:
> >>
> >>> What Alex says.
> >>>
> >>> The order is hard-coded (ie not race condition), but there is no
> >>> guarantee that we won't change the order in future releases without
> >>> notice (ie, it's not a public contract and we don't need to do a KIP to
> >>> change it). I guess there would be a Jira about it... But as a matter
> of
> >>> fact, it does not really matter (detail below).
> >>>
> >>> For the three scenarios you mentioned, the 3rd one cannot happen
> though:
> >>> We execute an aggregator in a single thread (per shard) and thus we
> >>> either call the adder or subtractor first.
> >>>
> >>>
> >>>
> >>>> 1. Seems like an unusual design choice
> >>>
> >>> Why do you think so?
> >>>
> >>>
> >>>
> >>>> first with a Change<V> value that includes only
> >>>> the old value and then the process function is called again with a
> >>> Change<V>
> >>>> value that includes only the new value.
> >>>
> >>> In general, both records might be processed by different threads and
> >>> thus we cannot only send one record. It's just that the TTD simulates a
> >>> single threaded execution thus both records always end up in the same
> >>> processor.
> >>>
> >>> Cf
> >>>
> >>>
> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
> >>>
> >>> However, the order actually only matters if both records really end up
> >>> in the same processor (if the grouping key did not change during the
> >>> upstream update).
> >>>
> >>> Furthermore, the order actually depends not on the downstream aggregate
> >>> implementation, but on the order of writes into the repartitions topic
> >>> of the `groupBy()` and with multiple parallel upstream processor, those
> >>> writes are interleaved anyway. Thus, in general, you should think of
> the
> >>> "add" and "subtract" part as independent entities and not make any
> >>> assumption about their order (also, even if the key did not change,
> both
> >>> records might be interleaved by other records...)
> >>>
> >>> The only guarantee we can provide is (given that you configured the
> >>> producer correctly to avoid re-ordring during send()), that if the
> >>> grouping key does not change, the send of the old and new value will
> not
> >>> be re-ordered relative to each other. The order of the send is
> >>> hard-coded in the upstream processor though:
> >>>
> >>>
> >>>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
> >>>
> >>> Thus, the order of the downstream aggregate processor is actually
> >>> meaningless.
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 1/29/21 6:50 AM, Alexandre Brasil wrote:
> >>>> From the source code in KGroupedTableImpl, the subtractor is always
> >>> called
> >>>> before the adder. By not guaranteeing the order, I think the devs
> meant
> >>>> that it might change on future versions of Kafka Streams (although I'd
> >>>> think it's unlikely to).
> >>>>
> >>>> I have use cases similars with your example, and that phrase worries
> me
> >>> a
> >>>> bit too. :)
> >>>>
> >>>> On Thu, Jan 28, 2021, 22:31 Fq Public <fq...@gmail.com> wrote:
> >>>>
> >>>>> Hi everyone! I posted this same question on stackoverflow
> >>>>> <
> >>>>>
> >>>
> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
> >>>>>>
> >>>>> a few days ago but didn't get any responses. Was hoping someone here
> >>> might
> >>>>> be able to help clarify this part of the documentation for me :)
> >>>>>
> >>>>> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq...@gmail.com>
> wrote:
> >>>>>
> >>>>>> The Streams DSL documentation
> >>>>>> <
> >>>>>
> >>>
> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating
> >>>>
> >>>>> includes
> >>>>>> a caveat about using the aggregate method to transform a
> >>> KGroupedTable →
> >>>>>> KTable, as follows (emphasis mine):
> >>>>>>
> >>>>>> When subsequent non-null values are received for a key (e.g.,
> UPDATE),
> >>>>>> then (1) the subtractor is called with the old value as stored in
> the
> >>>>> table
> >>>>>> and (2) the adder is called with the new value of the input record
> >>> that
> >>>>> was
> >>>>>> just received. *The order of execution for the subtractor and adder
> is
> >>>>>> not defined.*
> >>>>>>
> >>>>>> My interpretation of that last line implies that one of three things
> >>> can
> >>>>>> happen:
> >>>>>>
> >>>>>>    1. subtractor can be called before adder
> >>>>>>    2. adder can be called before subtractor
> >>>>>>    3. adder and subtractor could be called at the same time
> >>>>>>
> >>>>>> Here is the question I'm looking to get answered:
> >>>>>> *Are all 3 scenarios above actually possible when using the
> aggregate
> >>>>>> method on a KGroupedTable?*
> >>>>>> Or am I misinterpreting the documentation? For my use-case (detailed
> >>>>>> below), it would be ideal if the subtractor was always called before
> >>> the
> >>>>>> adder.
> >>>>>> ------------------------------
> >>>>>>
> >>>>>> *Why is this question important?*
> >>>>>>
> >>>>>> If the adder and subtractor are non-commutative operations and the
> >>> order
> >>>>>> in which they are executed can vary, you can end up with different
> >>>>> results
> >>>>>> depending on the order of execution of adder and subtractor. An
> >>> example
> >>>>> of
> >>>>>> a useful non-commutative operation would be something like if we’re
> >>>>>> aggregating records into a Set:
> >>>>>>
> >>>>>> .aggregate[Set[Animal]](Set.empty)(
> >>>>>>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
> >>>>> animalValue,
> >>>>>>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
> >>>>> animalValue
> >>>>>> )
> >>>>>>
> >>>>>> In this example, for duplicated events, if the adder is called
> before
> >>> the
> >>>>>> subtractor you would end up removing the value entirely from the set
> >>>>> (which
> >>>>>> would be problematic for most use-cases I imagine).
> >>>>>> ------------------------------
> >>>>>>
> >>>>>> *Why am I doubting the documentation (assuming my interpretation of
> >>> it is
> >>>>>> correct)?*
> >>>>>>
> >>>>>>    1. Seems like an unusual design choice
> >>>>>>    2. When I've run unit tests (using TopologyTestDriver and
> >>>>>>    EmbeddedKafka), I always see the subtractor is called before the
> >>>>> adder.
> >>>>>>    Unfortunately, if there is some kind of race condition involved,
> >>> it's
> >>>>>>    entirely possible that I would never hit the other scenarios.
> >>>>>>    3. I did try looking into the kafka-streams codebase as well. The
> >>>>>>    KTableProcessorSupplier that calls the user-supplied
> >>> adder/subtracter
> >>>>>>    functions appears to be this one:
> >>>>>>
> >>>>>
> >>>
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
> >>>>> and
> >>>>>>    on line 92, you can even see a comment saying "first try to
> remove
> >>>>> the old
> >>>>>>    value". Unfortunately, during my own testing, I saw was that the
> >>>>>>    process function itself is called twice; first with a Change<V>
> >>> value
> >>>>> that
> >>>>>>    includes only the old value and then the process function is
> called
> >>>>>>    again with a Change<V> value that includes only the new value. I
> >>>>>>    haven't been able to dig deep enough to find the internal code
> >>> that is
> >>>>>>    generating the old value record and the new value record (upon
> >>>>> receiving an
> >>>>>>    update) to determine if it actually produces those records in
> that
> >>>>> order.
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: Clarify “the order of execution for the subtractor and adder is not defined”

Posted by "Matthias J. Sax" <mj...@apache.org>.
About the producer config:

By default, the producer will re-try to send record batches in case a
send failed. Furthermore, the producer default configs allows for 5
parallel in-flight request per TCP connect. Thus, if one request fails
while other succeed, and the producer retries the send request, the
retired request would effectively get re-ordered.

To avoid re-ordering, you can either set config `max.in.flight.request =
1`, or you enable idempotent writes. Ie, the default config does _not_
guard against reordering in case of retries, but is optimized for
increased throughput.

>> I take it this means that, if I were joining a KStream with this KTable, it
>> is entirely possible that a KStream record could join with the KTable in a
>> state where the subtractor has been already executed but the adder has not
>> yet been executed for the corresponding KTable-update event?

Yes, that would be possible (even without interleaved records, as we
always send two records). We could maybe close this gap, but the fix
would depend on a proper implementation of .equals() method for the key
type. Feel free to create a Jira ticket for this case.

We have work in progress the improve DSL operator semantics. In the
meantime, if the current implementation of the DSL operators are an
issue, you could fall back to the PAPI and implement custom operators
that address the corner cases that are important for your use case.


-Matthias

On 2/1/21 10:39 AM, Fq Public wrote:
> The other follow-up question I have is related to this part of your
> statement:
> ```
> also, even if the key did not change, both records might be interleaved by
> other records...
> ```
> I take it this means that, if I were joining a KStream with this KTable, it
> is entirely possible that a KStream record could join with the KTable in a
> state where the subtractor has been already executed but the adder has not
> yet been executed for the corresponding KTable-update event?
> 
> Cheers,
> FQ
> 
> On Mon, 1 Feb 2021 at 11:53, Fq Public <fq...@gmail.com> wrote:
> 
>> Hiya Matthias, Alexandre,
>>
>> Thanks for your detailed responses. Your explanation about why the order
>> of execution in the `KGroupedTable.aggregate` method does not matter (so
>> much as what's happening in the `KTable.groupBy` method) makes sense to me.
>>
>> I have one follow-up question regarding this part of your statement:
>>
>> The only guarantee we can provide is (given that you configured the
>> producer correctly to avoid re-ordring during send()), that if the
>> grouping key does not change, the send of the old and new value will not
>> be re-ordered relative to each other. The order of the send is hard-coded in the upstream processor though: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
>>
>> What would be an example of an incorrect configuration I could apply that
>> would result in re-ordering during the `send()` call?
>> As you said, "the order of the send is hard-coded in the upstream
>> processor" so I'm struggling to come up with an example here.
>>
>> Cheers,
>> FQ
>>
>> On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> What Alex says.
>>>
>>> The order is hard-coded (ie not race condition), but there is no
>>> guarantee that we won't change the order in future releases without
>>> notice (ie, it's not a public contract and we don't need to do a KIP to
>>> change it). I guess there would be a Jira about it... But as a matter of
>>> fact, it does not really matter (detail below).
>>>
>>> For the three scenarios you mentioned, the 3rd one cannot happen though:
>>> We execute an aggregator in a single thread (per shard) and thus we
>>> either call the adder or subtractor first.
>>>
>>>
>>>
>>>> 1. Seems like an unusual design choice
>>>
>>> Why do you think so?
>>>
>>>
>>>
>>>> first with a Change<V> value that includes only
>>>> the old value and then the process function is called again with a
>>> Change<V>
>>>> value that includes only the new value.
>>>
>>> In general, both records might be processed by different threads and
>>> thus we cannot only send one record. It's just that the TTD simulates a
>>> single threaded execution thus both records always end up in the same
>>> processor.
>>>
>>> Cf
>>>
>>> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
>>>
>>> However, the order actually only matters if both records really end up
>>> in the same processor (if the grouping key did not change during the
>>> upstream update).
>>>
>>> Furthermore, the order actually depends not on the downstream aggregate
>>> implementation, but on the order of writes into the repartitions topic
>>> of the `groupBy()` and with multiple parallel upstream processor, those
>>> writes are interleaved anyway. Thus, in general, you should think of the
>>> "add" and "subtract" part as independent entities and not make any
>>> assumption about their order (also, even if the key did not change, both
>>> records might be interleaved by other records...)
>>>
>>> The only guarantee we can provide is (given that you configured the
>>> producer correctly to avoid re-ordring during send()), that if the
>>> grouping key does not change, the send of the old and new value will not
>>> be re-ordered relative to each other. The order of the send is
>>> hard-coded in the upstream processor though:
>>>
>>>
>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
>>>
>>> Thus, the order of the downstream aggregate processor is actually
>>> meaningless.
>>>
>>>
>>>
>>> -Matthias
>>>
>>> On 1/29/21 6:50 AM, Alexandre Brasil wrote:
>>>> From the source code in KGroupedTableImpl, the subtractor is always
>>> called
>>>> before the adder. By not guaranteeing the order, I think the devs meant
>>>> that it might change on future versions of Kafka Streams (although I'd
>>>> think it's unlikely to).
>>>>
>>>> I have use cases similars with your example, and that phrase worries me
>>> a
>>>> bit too. :)
>>>>
>>>> On Thu, Jan 28, 2021, 22:31 Fq Public <fq...@gmail.com> wrote:
>>>>
>>>>> Hi everyone! I posted this same question on stackoverflow
>>>>> <
>>>>>
>>> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
>>>>>>
>>>>> a few days ago but didn't get any responses. Was hoping someone here
>>> might
>>>>> be able to help clarify this part of the documentation for me :)
>>>>>
>>>>> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq...@gmail.com> wrote:
>>>>>
>>>>>> The Streams DSL documentation
>>>>>> <
>>>>>
>>> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating
>>>>
>>>>> includes
>>>>>> a caveat about using the aggregate method to transform a
>>> KGroupedTable →
>>>>>> KTable, as follows (emphasis mine):
>>>>>>
>>>>>> When subsequent non-null values are received for a key (e.g., UPDATE),
>>>>>> then (1) the subtractor is called with the old value as stored in the
>>>>> table
>>>>>> and (2) the adder is called with the new value of the input record
>>> that
>>>>> was
>>>>>> just received. *The order of execution for the subtractor and adder is
>>>>>> not defined.*
>>>>>>
>>>>>> My interpretation of that last line implies that one of three things
>>> can
>>>>>> happen:
>>>>>>
>>>>>>    1. subtractor can be called before adder
>>>>>>    2. adder can be called before subtractor
>>>>>>    3. adder and subtractor could be called at the same time
>>>>>>
>>>>>> Here is the question I'm looking to get answered:
>>>>>> *Are all 3 scenarios above actually possible when using the aggregate
>>>>>> method on a KGroupedTable?*
>>>>>> Or am I misinterpreting the documentation? For my use-case (detailed
>>>>>> below), it would be ideal if the subtractor was always called before
>>> the
>>>>>> adder.
>>>>>> ------------------------------
>>>>>>
>>>>>> *Why is this question important?*
>>>>>>
>>>>>> If the adder and subtractor are non-commutative operations and the
>>> order
>>>>>> in which they are executed can vary, you can end up with different
>>>>> results
>>>>>> depending on the order of execution of adder and subtractor. An
>>> example
>>>>> of
>>>>>> a useful non-commutative operation would be something like if we’re
>>>>>> aggregating records into a Set:
>>>>>>
>>>>>> .aggregate[Set[Animal]](Set.empty)(
>>>>>>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
>>>>> animalValue,
>>>>>>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
>>>>> animalValue
>>>>>> )
>>>>>>
>>>>>> In this example, for duplicated events, if the adder is called before
>>> the
>>>>>> subtractor you would end up removing the value entirely from the set
>>>>> (which
>>>>>> would be problematic for most use-cases I imagine).
>>>>>> ------------------------------
>>>>>>
>>>>>> *Why am I doubting the documentation (assuming my interpretation of
>>> it is
>>>>>> correct)?*
>>>>>>
>>>>>>    1. Seems like an unusual design choice
>>>>>>    2. When I've run unit tests (using TopologyTestDriver and
>>>>>>    EmbeddedKafka), I always see the subtractor is called before the
>>>>> adder.
>>>>>>    Unfortunately, if there is some kind of race condition involved,
>>> it's
>>>>>>    entirely possible that I would never hit the other scenarios.
>>>>>>    3. I did try looking into the kafka-streams codebase as well. The
>>>>>>    KTableProcessorSupplier that calls the user-supplied
>>> adder/subtracter
>>>>>>    functions appears to be this one:
>>>>>>
>>>>>
>>> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
>>>>> and
>>>>>>    on line 92, you can even see a comment saying "first try to remove
>>>>> the old
>>>>>>    value". Unfortunately, during my own testing, I saw was that the
>>>>>>    process function itself is called twice; first with a Change<V>
>>> value
>>>>> that
>>>>>>    includes only the old value and then the process function is called
>>>>>>    again with a Change<V> value that includes only the new value. I
>>>>>>    haven't been able to dig deep enough to find the internal code
>>> that is
>>>>>>    generating the old value record and the new value record (upon
>>>>> receiving an
>>>>>>    update) to determine if it actually produces those records in that
>>>>> order.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: Clarify “the order of execution for the subtractor and adder is not defined”

Posted by Fq Public <fq...@gmail.com>.
The other follow-up question I have is related to this part of your
statement:
```
also, even if the key did not change, both records might be interleaved by
other records...
```
I take it this means that, if I were joining a KStream with this KTable, it
is entirely possible that a KStream record could join with the KTable in a
state where the subtractor has been already executed but the adder has not
yet been executed for the corresponding KTable-update event?

Cheers,
FQ

On Mon, 1 Feb 2021 at 11:53, Fq Public <fq...@gmail.com> wrote:

> Hiya Matthias, Alexandre,
>
> Thanks for your detailed responses. Your explanation about why the order
> of execution in the `KGroupedTable.aggregate` method does not matter (so
> much as what's happening in the `KTable.groupBy` method) makes sense to me.
>
> I have one follow-up question regarding this part of your statement:
>
> The only guarantee we can provide is (given that you configured the
> producer correctly to avoid re-ordring during send()), that if the
> grouping key does not change, the send of the old and new value will not
> be re-ordered relative to each other. The order of the send is hard-coded in the upstream processor though: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
>
> What would be an example of an incorrect configuration I could apply that
> would result in re-ordering during the `send()` call?
> As you said, "the order of the send is hard-coded in the upstream
> processor" so I'm struggling to come up with an example here.
>
> Cheers,
> FQ
>
> On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <mj...@apache.org> wrote:
>
>> What Alex says.
>>
>> The order is hard-coded (ie not race condition), but there is no
>> guarantee that we won't change the order in future releases without
>> notice (ie, it's not a public contract and we don't need to do a KIP to
>> change it). I guess there would be a Jira about it... But as a matter of
>> fact, it does not really matter (detail below).
>>
>> For the three scenarios you mentioned, the 3rd one cannot happen though:
>> We execute an aggregator in a single thread (per shard) and thus we
>> either call the adder or subtractor first.
>>
>>
>>
>> > 1. Seems like an unusual design choice
>>
>> Why do you think so?
>>
>>
>>
>> > first with a Change<V> value that includes only
>> > the old value and then the process function is called again with a
>> Change<V>
>> > value that includes only the new value.
>>
>> In general, both records might be processed by different threads and
>> thus we cannot only send one record. It's just that the TTD simulates a
>> single threaded execution thus both records always end up in the same
>> processor.
>>
>> Cf
>>
>> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
>>
>> However, the order actually only matters if both records really end up
>> in the same processor (if the grouping key did not change during the
>> upstream update).
>>
>> Furthermore, the order actually depends not on the downstream aggregate
>> implementation, but on the order of writes into the repartitions topic
>> of the `groupBy()` and with multiple parallel upstream processor, those
>> writes are interleaved anyway. Thus, in general, you should think of the
>> "add" and "subtract" part as independent entities and not make any
>> assumption about their order (also, even if the key did not change, both
>> records might be interleaved by other records...)
>>
>> The only guarantee we can provide is (given that you configured the
>> producer correctly to avoid re-ordring during send()), that if the
>> grouping key does not change, the send of the old and new value will not
>> be re-ordered relative to each other. The order of the send is
>> hard-coded in the upstream processor though:
>>
>>
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
>>
>> Thus, the order of the downstream aggregate processor is actually
>> meaningless.
>>
>>
>>
>> -Matthias
>>
>> On 1/29/21 6:50 AM, Alexandre Brasil wrote:
>> > From the source code in KGroupedTableImpl, the subtractor is always
>> called
>> > before the adder. By not guaranteeing the order, I think the devs meant
>> > that it might change on future versions of Kafka Streams (although I'd
>> > think it's unlikely to).
>> >
>> > I have use cases similars with your example, and that phrase worries me
>> a
>> > bit too. :)
>> >
>> > On Thu, Jan 28, 2021, 22:31 Fq Public <fq...@gmail.com> wrote:
>> >
>> >> Hi everyone! I posted this same question on stackoverflow
>> >> <
>> >>
>> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
>> >>>
>> >> a few days ago but didn't get any responses. Was hoping someone here
>> might
>> >> be able to help clarify this part of the documentation for me :)
>> >>
>> >> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq...@gmail.com> wrote:
>> >>
>> >>> The Streams DSL documentation
>> >>> <
>> >>
>> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating
>> >
>> >> includes
>> >>> a caveat about using the aggregate method to transform a
>> KGroupedTable →
>> >>> KTable, as follows (emphasis mine):
>> >>>
>> >>> When subsequent non-null values are received for a key (e.g., UPDATE),
>> >>> then (1) the subtractor is called with the old value as stored in the
>> >> table
>> >>> and (2) the adder is called with the new value of the input record
>> that
>> >> was
>> >>> just received. *The order of execution for the subtractor and adder is
>> >>> not defined.*
>> >>>
>> >>> My interpretation of that last line implies that one of three things
>> can
>> >>> happen:
>> >>>
>> >>>    1. subtractor can be called before adder
>> >>>    2. adder can be called before subtractor
>> >>>    3. adder and subtractor could be called at the same time
>> >>>
>> >>> Here is the question I'm looking to get answered:
>> >>> *Are all 3 scenarios above actually possible when using the aggregate
>> >>> method on a KGroupedTable?*
>> >>> Or am I misinterpreting the documentation? For my use-case (detailed
>> >>> below), it would be ideal if the subtractor was always called before
>> the
>> >>> adder.
>> >>> ------------------------------
>> >>>
>> >>> *Why is this question important?*
>> >>>
>> >>> If the adder and subtractor are non-commutative operations and the
>> order
>> >>> in which they are executed can vary, you can end up with different
>> >> results
>> >>> depending on the order of execution of adder and subtractor. An
>> example
>> >> of
>> >>> a useful non-commutative operation would be something like if we’re
>> >>> aggregating records into a Set:
>> >>>
>> >>> .aggregate[Set[Animal]](Set.empty)(
>> >>>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
>> >> animalValue,
>> >>>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
>> >> animalValue
>> >>> )
>> >>>
>> >>> In this example, for duplicated events, if the adder is called before
>> the
>> >>> subtractor you would end up removing the value entirely from the set
>> >> (which
>> >>> would be problematic for most use-cases I imagine).
>> >>> ------------------------------
>> >>>
>> >>> *Why am I doubting the documentation (assuming my interpretation of
>> it is
>> >>> correct)?*
>> >>>
>> >>>    1. Seems like an unusual design choice
>> >>>    2. When I've run unit tests (using TopologyTestDriver and
>> >>>    EmbeddedKafka), I always see the subtractor is called before the
>> >> adder.
>> >>>    Unfortunately, if there is some kind of race condition involved,
>> it's
>> >>>    entirely possible that I would never hit the other scenarios.
>> >>>    3. I did try looking into the kafka-streams codebase as well. The
>> >>>    KTableProcessorSupplier that calls the user-supplied
>> adder/subtracter
>> >>>    functions appears to be this one:
>> >>>
>> >>
>> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
>> >> and
>> >>>    on line 92, you can even see a comment saying "first try to remove
>> >> the old
>> >>>    value". Unfortunately, during my own testing, I saw was that the
>> >>>    process function itself is called twice; first with a Change<V>
>> value
>> >> that
>> >>>    includes only the old value and then the process function is called
>> >>>    again with a Change<V> value that includes only the new value. I
>> >>>    haven't been able to dig deep enough to find the internal code
>> that is
>> >>>    generating the old value record and the new value record (upon
>> >> receiving an
>> >>>    update) to determine if it actually produces those records in that
>> >> order.
>> >>>
>> >>>
>> >>
>> >
>>
>

Re: Clarify “the order of execution for the subtractor and adder is not defined”

Posted by Fq Public <fq...@gmail.com>.
Hiya Matthias, Alexandre,

Thanks for your detailed responses. Your explanation about why the order of
execution in the `KGroupedTable.aggregate` method does not matter (so much
as what's happening in the `KTable.groupBy` method) makes sense to me.

I have one follow-up question regarding this part of your statement:

The only guarantee we can provide is (given that you configured the
producer correctly to avoid re-ordring during send()), that if the
grouping key does not change, the send of the old and new value will not
be re-ordered relative to each other. The order of the send is
hard-coded in the upstream processor though:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99

What would be an example of an incorrect configuration I could apply that
would result in re-ordering during the `send()` call?
As you said, "the order of the send is hard-coded in the upstream
processor" so I'm struggling to come up with an example here.

Cheers,
FQ

On Fri, 29 Jan 2021 at 17:11, Matthias J. Sax <mj...@apache.org> wrote:

> What Alex says.
>
> The order is hard-coded (ie not race condition), but there is no
> guarantee that we won't change the order in future releases without
> notice (ie, it's not a public contract and we don't need to do a KIP to
> change it). I guess there would be a Jira about it... But as a matter of
> fact, it does not really matter (detail below).
>
> For the three scenarios you mentioned, the 3rd one cannot happen though:
> We execute an aggregator in a single thread (per shard) and thus we
> either call the adder or subtractor first.
>
>
>
> > 1. Seems like an unusual design choice
>
> Why do you think so?
>
>
>
> > first with a Change<V> value that includes only
> > the old value and then the process function is called again with a
> Change<V>
> > value that includes only the new value.
>
> In general, both records might be processed by different threads and
> thus we cannot only send one record. It's just that the TTD simulates a
> single threaded execution thus both records always end up in the same
> processor.
>
> Cf
>
> https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
>
> However, the order actually only matters if both records really end up
> in the same processor (if the grouping key did not change during the
> upstream update).
>
> Furthermore, the order actually depends not on the downstream aggregate
> implementation, but on the order of writes into the repartitions topic
> of the `groupBy()` and with multiple parallel upstream processor, those
> writes are interleaved anyway. Thus, in general, you should think of the
> "add" and "subtract" part as independent entities and not make any
> assumption about their order (also, even if the key did not change, both
> records might be interleaved by other records...)
>
> The only guarantee we can provide is (given that you configured the
> producer correctly to avoid re-ordring during send()), that if the
> grouping key does not change, the send of the old and new value will not
> be re-ordered relative to each other. The order of the send is
> hard-coded in the upstream processor though:
>
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99
>
> Thus, the order of the downstream aggregate processor is actually
> meaningless.
>
>
>
> -Matthias
>
> On 1/29/21 6:50 AM, Alexandre Brasil wrote:
> > From the source code in KGroupedTableImpl, the subtractor is always
> called
> > before the adder. By not guaranteeing the order, I think the devs meant
> > that it might change on future versions of Kafka Streams (although I'd
> > think it's unlikely to).
> >
> > I have use cases similars with your example, and that phrase worries me a
> > bit too. :)
> >
> > On Thu, Jan 28, 2021, 22:31 Fq Public <fq...@gmail.com> wrote:
> >
> >> Hi everyone! I posted this same question on stackoverflow
> >> <
> >>
> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
> >>>
> >> a few days ago but didn't get any responses. Was hoping someone here
> might
> >> be able to help clarify this part of the documentation for me :)
> >>
> >> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq...@gmail.com> wrote:
> >>
> >>> The Streams DSL documentation
> >>> <
> >>
> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating
> >
> >> includes
> >>> a caveat about using the aggregate method to transform a KGroupedTable
> →
> >>> KTable, as follows (emphasis mine):
> >>>
> >>> When subsequent non-null values are received for a key (e.g., UPDATE),
> >>> then (1) the subtractor is called with the old value as stored in the
> >> table
> >>> and (2) the adder is called with the new value of the input record that
> >> was
> >>> just received. *The order of execution for the subtractor and adder is
> >>> not defined.*
> >>>
> >>> My interpretation of that last line implies that one of three things
> can
> >>> happen:
> >>>
> >>>    1. subtractor can be called before adder
> >>>    2. adder can be called before subtractor
> >>>    3. adder and subtractor could be called at the same time
> >>>
> >>> Here is the question I'm looking to get answered:
> >>> *Are all 3 scenarios above actually possible when using the aggregate
> >>> method on a KGroupedTable?*
> >>> Or am I misinterpreting the documentation? For my use-case (detailed
> >>> below), it would be ideal if the subtractor was always called before
> the
> >>> adder.
> >>> ------------------------------
> >>>
> >>> *Why is this question important?*
> >>>
> >>> If the adder and subtractor are non-commutative operations and the
> order
> >>> in which they are executed can vary, you can end up with different
> >> results
> >>> depending on the order of execution of adder and subtractor. An example
> >> of
> >>> a useful non-commutative operation would be something like if we’re
> >>> aggregating records into a Set:
> >>>
> >>> .aggregate[Set[Animal]](Set.empty)(
> >>>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
> >> animalValue,
> >>>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
> >> animalValue
> >>> )
> >>>
> >>> In this example, for duplicated events, if the adder is called before
> the
> >>> subtractor you would end up removing the value entirely from the set
> >> (which
> >>> would be problematic for most use-cases I imagine).
> >>> ------------------------------
> >>>
> >>> *Why am I doubting the documentation (assuming my interpretation of it
> is
> >>> correct)?*
> >>>
> >>>    1. Seems like an unusual design choice
> >>>    2. When I've run unit tests (using TopologyTestDriver and
> >>>    EmbeddedKafka), I always see the subtractor is called before the
> >> adder.
> >>>    Unfortunately, if there is some kind of race condition involved,
> it's
> >>>    entirely possible that I would never hit the other scenarios.
> >>>    3. I did try looking into the kafka-streams codebase as well. The
> >>>    KTableProcessorSupplier that calls the user-supplied
> adder/subtracter
> >>>    functions appears to be this one:
> >>>
> >>
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
> >> and
> >>>    on line 92, you can even see a comment saying "first try to remove
> >> the old
> >>>    value". Unfortunately, during my own testing, I saw was that the
> >>>    process function itself is called twice; first with a Change<V>
> value
> >> that
> >>>    includes only the old value and then the process function is called
> >>>    again with a Change<V> value that includes only the new value. I
> >>>    haven't been able to dig deep enough to find the internal code that
> is
> >>>    generating the old value record and the new value record (upon
> >> receiving an
> >>>    update) to determine if it actually produces those records in that
> >> order.
> >>>
> >>>
> >>
> >
>

Re: Clarify “the order of execution for the subtractor and adder is not defined”

Posted by "Matthias J. Sax" <mj...@apache.org>.
What Alex says.

The order is hard-coded (ie not race condition), but there is no
guarantee that we won't change the order in future releases without
notice (ie, it's not a public contract and we don't need to do a KIP to
change it). I guess there would be a Jira about it... But as a matter of
fact, it does not really matter (detail below).

For the three scenarios you mentioned, the 3rd one cannot happen though:
We execute an aggregator in a single thread (per shard) and thus we
either call the adder or subtractor first.



> 1. Seems like an unusual design choice

Why do you think so?



> first with a Change<V> value that includes only
> the old value and then the process function is called again with a Change<V>
> value that includes only the new value.

In general, both records might be processed by different threads and
thus we cannot only send one record. It's just that the TTD simulates a
single threaded execution thus both records always end up in the same
processor.

Cf
https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations

However, the order actually only matters if both records really end up
in the same processor (if the grouping key did not change during the
upstream update).

Furthermore, the order actually depends not on the downstream aggregate
implementation, but on the order of writes into the repartitions topic
of the `groupBy()` and with multiple parallel upstream processor, those
writes are interleaved anyway. Thus, in general, you should think of the
"add" and "subtract" part as independent entities and not make any
assumption about their order (also, even if the key did not change, both
records might be interleaved by other records...)

The only guarantee we can provide is (given that you configured the
producer correctly to avoid re-ordring during send()), that if the
grouping key does not change, the send of the old and new value will not
be re-ordered relative to each other. The order of the send is
hard-coded in the upstream processor though:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java#L93-L99

Thus, the order of the downstream aggregate processor is actually
meaningless.



-Matthias

On 1/29/21 6:50 AM, Alexandre Brasil wrote:
> From the source code in KGroupedTableImpl, the subtractor is always called
> before the adder. By not guaranteeing the order, I think the devs meant
> that it might change on future versions of Kafka Streams (although I'd
> think it's unlikely to).
> 
> I have use cases similars with your example, and that phrase worries me a
> bit too. :)
> 
> On Thu, Jan 28, 2021, 22:31 Fq Public <fq...@gmail.com> wrote:
> 
>> Hi everyone! I posted this same question on stackoverflow
>> <
>> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
>>>
>> a few days ago but didn't get any responses. Was hoping someone here might
>> be able to help clarify this part of the documentation for me :)
>>
>> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq...@gmail.com> wrote:
>>
>>> The Streams DSL documentation
>>> <
>> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
>> includes
>>> a caveat about using the aggregate method to transform a KGroupedTable →
>>> KTable, as follows (emphasis mine):
>>>
>>> When subsequent non-null values are received for a key (e.g., UPDATE),
>>> then (1) the subtractor is called with the old value as stored in the
>> table
>>> and (2) the adder is called with the new value of the input record that
>> was
>>> just received. *The order of execution for the subtractor and adder is
>>> not defined.*
>>>
>>> My interpretation of that last line implies that one of three things can
>>> happen:
>>>
>>>    1. subtractor can be called before adder
>>>    2. adder can be called before subtractor
>>>    3. adder and subtractor could be called at the same time
>>>
>>> Here is the question I'm looking to get answered:
>>> *Are all 3 scenarios above actually possible when using the aggregate
>>> method on a KGroupedTable?*
>>> Or am I misinterpreting the documentation? For my use-case (detailed
>>> below), it would be ideal if the subtractor was always called before the
>>> adder.
>>> ------------------------------
>>>
>>> *Why is this question important?*
>>>
>>> If the adder and subtractor are non-commutative operations and the order
>>> in which they are executed can vary, you can end up with different
>> results
>>> depending on the order of execution of adder and subtractor. An example
>> of
>>> a useful non-commutative operation would be something like if we’re
>>> aggregating records into a Set:
>>>
>>> .aggregate[Set[Animal]](Set.empty)(
>>>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
>> animalValue,
>>>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
>> animalValue
>>> )
>>>
>>> In this example, for duplicated events, if the adder is called before the
>>> subtractor you would end up removing the value entirely from the set
>> (which
>>> would be problematic for most use-cases I imagine).
>>> ------------------------------
>>>
>>> *Why am I doubting the documentation (assuming my interpretation of it is
>>> correct)?*
>>>
>>>    1. Seems like an unusual design choice
>>>    2. When I've run unit tests (using TopologyTestDriver and
>>>    EmbeddedKafka), I always see the subtractor is called before the
>> adder.
>>>    Unfortunately, if there is some kind of race condition involved, it's
>>>    entirely possible that I would never hit the other scenarios.
>>>    3. I did try looking into the kafka-streams codebase as well. The
>>>    KTableProcessorSupplier that calls the user-supplied adder/subtracter
>>>    functions appears to be this one:
>>>
>> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
>> and
>>>    on line 92, you can even see a comment saying "first try to remove
>> the old
>>>    value". Unfortunately, during my own testing, I saw was that the
>>>    process function itself is called twice; first with a Change<V> value
>> that
>>>    includes only the old value and then the process function is called
>>>    again with a Change<V> value that includes only the new value. I
>>>    haven't been able to dig deep enough to find the internal code that is
>>>    generating the old value record and the new value record (upon
>> receiving an
>>>    update) to determine if it actually produces those records in that
>> order.
>>>
>>>
>>
> 

Re: Clarify “the order of execution for the subtractor and adder is not defined”

Posted by Alexandre Brasil <al...@gmail.com>.
From the source code in KGroupedTableImpl, the subtractor is always called
before the adder. By not guaranteeing the order, I think the devs meant
that it might change on future versions of Kafka Streams (although I'd
think it's unlikely to).

I have use cases similars with your example, and that phrase worries me a
bit too. :)

On Thu, Jan 28, 2021, 22:31 Fq Public <fq...@gmail.com> wrote:

> Hi everyone! I posted this same question on stackoverflow
> <
> https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined
> >
> a few days ago but didn't get any responses. Was hoping someone here might
> be able to help clarify this part of the documentation for me :)
>
> On Thu, 28 Jan 2021 at 19:50, Fq Public <fq...@gmail.com> wrote:
>
> > The Streams DSL documentation
> > <
> https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating>
> includes
> > a caveat about using the aggregate method to transform a KGroupedTable →
> > KTable, as follows (emphasis mine):
> >
> > When subsequent non-null values are received for a key (e.g., UPDATE),
> > then (1) the subtractor is called with the old value as stored in the
> table
> > and (2) the adder is called with the new value of the input record that
> was
> > just received. *The order of execution for the subtractor and adder is
> > not defined.*
> >
> > My interpretation of that last line implies that one of three things can
> > happen:
> >
> >    1. subtractor can be called before adder
> >    2. adder can be called before subtractor
> >    3. adder and subtractor could be called at the same time
> >
> > Here is the question I'm looking to get answered:
> > *Are all 3 scenarios above actually possible when using the aggregate
> > method on a KGroupedTable?*
> > Or am I misinterpreting the documentation? For my use-case (detailed
> > below), it would be ideal if the subtractor was always called before the
> > adder.
> > ------------------------------
> >
> > *Why is this question important?*
> >
> > If the adder and subtractor are non-commutative operations and the order
> > in which they are executed can vary, you can end up with different
> results
> > depending on the order of execution of adder and subtractor. An example
> of
> > a useful non-commutative operation would be something like if we’re
> > aggregating records into a Set:
> >
> > .aggregate[Set[Animal]](Set.empty)(
> >   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals +
> animalValue,
> >   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals -
> animalValue
> > )
> >
> > In this example, for duplicated events, if the adder is called before the
> > subtractor you would end up removing the value entirely from the set
> (which
> > would be problematic for most use-cases I imagine).
> > ------------------------------
> >
> > *Why am I doubting the documentation (assuming my interpretation of it is
> > correct)?*
> >
> >    1. Seems like an unusual design choice
> >    2. When I've run unit tests (using TopologyTestDriver and
> >    EmbeddedKafka), I always see the subtractor is called before the
> adder.
> >    Unfortunately, if there is some kind of race condition involved, it's
> >    entirely possible that I would never hit the other scenarios.
> >    3. I did try looking into the kafka-streams codebase as well. The
> >    KTableProcessorSupplier that calls the user-supplied adder/subtracter
> >    functions appears to be this one:
> >
> https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81
> and
> >    on line 92, you can even see a comment saying "first try to remove
> the old
> >    value". Unfortunately, during my own testing, I saw was that the
> >    process function itself is called twice; first with a Change<V> value
> that
> >    includes only the old value and then the process function is called
> >    again with a Change<V> value that includes only the new value. I
> >    haven't been able to dig deep enough to find the internal code that is
> >    generating the old value record and the new value record (upon
> receiving an
> >    update) to determine if it actually produces those records in that
> order.
> >
> >
>

Re: Clarify “the order of execution for the subtractor and adder is not defined”

Posted by Fq Public <fq...@gmail.com>.
Hi everyone! I posted this same question on stackoverflow
<https://stackoverflow.com/questions/65888756/clarify-the-order-of-execution-for-the-subtractor-and-adder-is-not-defined>
a few days ago but didn't get any responses. Was hoping someone here might
be able to help clarify this part of the documentation for me :)

On Thu, 28 Jan 2021 at 19:50, Fq Public <fq...@gmail.com> wrote:

> The Streams DSL documentation
> <https://kafka.apache.org/27/documentation/streams/developer-guide/dsl-api.html#aggregating> includes
> a caveat about using the aggregate method to transform a KGroupedTable →
> KTable, as follows (emphasis mine):
>
> When subsequent non-null values are received for a key (e.g., UPDATE),
> then (1) the subtractor is called with the old value as stored in the table
> and (2) the adder is called with the new value of the input record that was
> just received. *The order of execution for the subtractor and adder is
> not defined.*
>
> My interpretation of that last line implies that one of three things can
> happen:
>
>    1. subtractor can be called before adder
>    2. adder can be called before subtractor
>    3. adder and subtractor could be called at the same time
>
> Here is the question I'm looking to get answered:
> *Are all 3 scenarios above actually possible when using the aggregate
> method on a KGroupedTable?*
> Or am I misinterpreting the documentation? For my use-case (detailed
> below), it would be ideal if the subtractor was always called before the
> adder.
> ------------------------------
>
> *Why is this question important?*
>
> If the adder and subtractor are non-commutative operations and the order
> in which they are executed can vary, you can end up with different results
> depending on the order of execution of adder and subtractor. An example of
> a useful non-commutative operation would be something like if we’re
> aggregating records into a Set:
>
> .aggregate[Set[Animal]](Set.empty)(
>   adder = (zooKey, animalValue, setOfAnimals) => setOfAnimals + animalValue,
>   subtractor = (zooKey, animalValue, setOfAnimals) => setOfAnimals - animalValue
> )
>
> In this example, for duplicated events, if the adder is called before the
> subtractor you would end up removing the value entirely from the set (which
> would be problematic for most use-cases I imagine).
> ------------------------------
>
> *Why am I doubting the documentation (assuming my interpretation of it is
> correct)?*
>
>    1. Seems like an unusual design choice
>    2. When I've run unit tests (using TopologyTestDriver and
>    EmbeddedKafka), I always see the subtractor is called before the adder.
>    Unfortunately, if there is some kind of race condition involved, it's
>    entirely possible that I would never hit the other scenarios.
>    3. I did try looking into the kafka-streams codebase as well. The
>    KTableProcessorSupplier that calls the user-supplied adder/subtracter
>    functions appears to be this one:
>    https://github.com/apache/kafka/blob/18547633697a29b690a8fb0c24e2f0289ecf8eeb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L81 and
>    on line 92, you can even see a comment saying "first try to remove the old
>    value". Unfortunately, during my own testing, I saw was that the
>    process function itself is called twice; first with a Change<V> value that
>    includes only the old value and then the process function is called
>    again with a Change<V> value that includes only the new value. I
>    haven't been able to dig deep enough to find the internal code that is
>    generating the old value record and the new value record (upon receiving an
>    update) to determine if it actually produces those records in that order.
>
>