You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <mj...@apache.org> on 2022/04/22 23:43:06 UTC

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

Ivan,

are you still interested in this KIP? I think it would be a good addition.


-Matthias

On 8/16/21 5:30 PM, Matthias J. Sax wrote:
> Your point about the IQ problem is an interesting one. I missed the
> point that the "new key" would be a "superkey", and thus, it should
> always be possible to compute the original key from the superkey. (As a
> matter of fact, for windowed-table the windowed-key is also a superkey...)
> 
> I am not sure if we need to follow the "use the head idea" or if we need
> a "CompositeKey" interface? It seems we can just allow for any types and
> we can be agnostic to it?
> 
> KStream<K, V> stream = ...
> KStream<SK, V> stream2 =
>    stream.selectKey(/*set superkey*/)
>          .markAsPartitioned()
> 
> We only need a `Function<SK, K>` without any restrictions on the type,
> to map the "superkey" to the original "partition key"?
> 
> 
> Do you propose to provide the "revers mapper" via the
> `markAsPartitioned()` method (or config object), or via the IQ methods?
> Not sure which one is better?
> 
> 
> However, I am not sure if it would solve the join problem? At least not
> easily: if one has two KStream<Tuple,...> and one is properly
> partitioned by `Tuple` while the other one is "marked-as-partitoned",
> the join would just fail. -- Similar for a stream-table join. -- The
> only fix would be to do the re-partitioning anyway, effectively ignoring
> the "user hint", but it seems to defeat the purpose? Again, I would
> argue that it is ok to not handle this case, but leave it as the
> responsibility for the user to not mess it up.
> 
> 
> -Matthias
> 
> On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
>> Hi Matthias and Sophie!
>>
>> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
>>
>> I don't have a strong opinion here, both Sophie's and Matthias' points
>> look convincing for me.
>>
>> I think we should estimate the following: what is the probability that
>> we will ever need to extend `selectKey` etc. with a config for the
>> purposes other than `markAsPartitioned`?
>>
>> If we find this probability high, then it's just a refactoring to
>> deprecate overloads with `Named` and introduce overloads with dedicated
>> configs, and we should do it this way.
>>
>> If it's low or zero, maybe it's better not to mess with the existing
>> APIs and to introduce a single `markAsPartitioned()` method, which
>> itself can be easily deprecated if we find a better solution later!
>>
>>
>> ==2. The IQ problem==
>>
>>> it then has to be the case that
>>
>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>
>>
>> Sophie, you got this wrong, and Matthias already explained why.
>>
>> The actual required property for the mapping function is:
>>
>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
>>
>> or, by contraposition law,
>>
>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
>>
>>
>> (look at the whiteboard photo that I attached to the KIP).
>>
>> There is a big class of such mappings: key -> Tuple(key, anyValue). This
>> is actually what we often do before aggregation, and this mapping does
>> not require repartition.
>>
>> But of course we can extract the original key from Tuple(key, anyValue),
>> and this can save IQ and joins!
>>
>> This is what I'm talking about when I talk about 'CompositeKey' idea.
>>
>> We can do the following:
>>
>> 1. implement a 'partitioner wrapper' that recognizes tuples
>> (CompositeKeys) and uses only the 'head' to calculate the partition,
>>
>> 2. implement
>>
>> selectCompositeKey(BiFunction<K, V> tailSelector) {
>>    selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>>    //MARK_AS_PARTITIONED call here,
>>    //but this call is an implementation detail and we do not expose
>>    //markAsPartitioned publicly!
>> }
>>
>> WDYT? (it's just a brainstorming idea)
>>
>> 09.08.2021 2:38, Matthias J. Sax пишет:
>>> Hi,
>>>
>>> I originally had a similar thought about `markAsPartitioned()` vs
>>> extending `selectKey()` et al. with a config. While I agree that it
>>> might be conceptually cleaner to use a config object, I did not propose
>>> it as the API impact (deprecating stuff and adding new stuff) is quite
>>> big... If we think it's an acceptable price to pay, I am ok with it
>>> though.
>>>
>>> I also do think, that `markAsPartitioned()` could actually be
>>> categorized as an operator... We don't expose it in the API as
>>> first-class citizen atm, but in fact we have two types of `KStream` -- a
>>> "PartitionedKStream" and a "NonPartitionedKStream". Thus,
>>> `markAsPartitioned()` can be seen as a "cast operator" that converts the
>>> one into the other.
>>>
>>> I also think that the raised concern about "forgetting to remove
>>> `markAsPartitioned()`" might not be very strong though. If you have
>>> different places in the code that link stuff together, a call to eg.
>>> `selectKey().markAsPartitioned()` must always to together. If you have
>>> some other place in the code that get a `KStream` passed an input, it
>>> would be "invalid" to blindly call `markAsPartitioned()` as you don't
>>> know anything about the upstream code. Of course, it requires some
>>> "coding discipline" to follow this pattern... Also, you can shoot
>>> themselves into the foot if they want with the config object pattern,
>>> too: if you get a `KStream` passed in, you can skip repartitioning via
>>> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
>>> slightly prefer to add `markAsPartitioned()` as an operator.
>>>
>>> (Maybe we should have expose a `PartitionedKStream` as first class
>>> object to begin with... Hard to introduce now I guess...)
>>>
>>>
>>> The concern about IQ is interesting -- I did not realize this impact.
>>> Thanks for bringing it up.
>>>
>>>> a repartition would be a no-op, ie that the stream (and its
>>>> partitioning)
>>>> would be the same
>>>> whether or not a repartition is inserted. For this to be true, it
>>>> then has
>>>> to be the case that
>>>>
>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>
>>> @Sophie: I don't think this statement is correct. A `markAsPartition()`
>>> only means, that the existing partitioning ensure that all messages of
>>> the same new key are still in the same partition. Ie, it cannot happen
>>> that two new keys (that are the same) are in a different partition.
>>>
>>> However, if you would physically repartitiong on the new key using the
>>> same hash-function as for the old key, there is no guarantee that the
>>> same partitions would be picked... And that is why IQ breaks downstream.
>>>
>>> Btw: using `markAsPartitioned()` could also be an issue for joins for
>>> the same reason... I want to call out, that the Jira tickets that did
>>> raise the concern about unnecessary repartitioning are about downstream
>>> aggregations though...
>>>
>>> Last but not least: we actually have a similar situation for
>>> windowed-aggregations: The result of a window aggregation is partitioned
>>> by the "plain key": if we write the result into a topic using the same
>>> partitioning function, we would write to different partitions... (I
>>> guess it was never an issue so far, as we don't have KIP-300 in place
>>> yet...)
>>>
>>> It's also not an issue for IQ, because we know the plain key, and thus
>>> can route to the right task.
>>>
>>>
>>> About a solution: I think it might be ok to say we don't need to solve
>>> this problem, but it's the users responsibility to take IQ into account.
>>> Ie, if they want to use IQ downstream, the need to repartition: for this
>>> case, repartitioning is _NOT_ unnecessary... The same argument seems to
>>> apply for the join case I mentioned above. -- Given that
>>> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to
>>> the user to use correctly (we should of course call it out in the docs!).
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
>>>> Before I dive in to the question of IQ and the approaches you
>>>> proposed, can
>>>> you just
>>>> elaborate on the problem itself? By definition, the `markAsPartitioned`
>>>> flag means that
>>>> a repartition would be a no-op, ie that the stream (and its
>>>> partitioning)
>>>> would be the same
>>>> whether or not a repartition is inserted. For this to be true, it
>>>> then has
>>>> to be the case that
>>>>
>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>
>>>> The left-hand side of the above is precisely how we determine the
>>>> partition
>>>> number that
>>>> a key belongs to when using IQ. It shouldn't matter whether the user is
>>>> querying a key
>>>> in a store upstream of the key-changing operation or a mapped key
>>>> downstream of it
>>>> -- either way we just apply the given Partitioner.
>>>>
>>>> See StreamsMetadataState#getKeyQueryMetadataForKey
>>>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>
>>>>
>>>> for where this happens
>>>>
>>>>
>>>> If we're concerned that users might try to abuse the new
>>>> `markAsPartitioned` feature,
>>>> or accidentally misuse it, then we could add a runtime check that
>>>> applies
>>>> the Partitioner
>>>> associated with that subtopology to the key being processed and the
>>>> mapped
>>>> key result
>>>> to assert that they do indeed match. Imo this is probably overkill, just
>>>> putting it out there.
>>>>
>>>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev
>>>> <ip...@mail.ru.invalid>
>>>> wrote:
>>>>
>>>>> Hi Sophie,
>>>>>
>>>>> thanks for your reply! So your proposal is:
>>>>>
>>>>> 1). For each key-changing operation, deprecate the existing overloads
>>>>> that accept a Named, and replace them with overloads that take an
>>>>> operator-specific config object.
>>>>> 2). Add `markAsPartitioned` flag to these configs.
>>>>>
>>>>> IMO, this looks much better than the original proposal, I like it very
>>>>> much and I think I will rewrite the KIP soon. I absolutely agree with
>>>>> your points. Repartition logic is not a part of the public contract,
>>>>> and
>>>>> it's much better to give it correct hints instead of telling explicitly
>>>>> what it should do.
>>>>>
>>>>> ...
>>>>>
>>>>> Since we're generating such bright ideas, maybe we should also
>>>>> brainstorm the interactive query problem?
>>>>>
>>>>> The problem is that interactive queries will not work properly when
>>>>> `markAsPartitioned` is used. Although original key and mapped key will
>>>>> be in the same partition, we will no longer be able to guess this
>>>>> partition given the mapped key only.
>>>>>
>>>>> The possible approaches are:
>>>>>
>>>>> 1) Give up and don't use interactive queries together with
>>>>> `markAsPartitioned`. This is what I suppose now. But can we do better?
>>>>>
>>>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will
>>>>> allow IQ to restore the original key in order to choose the correct
>>>>> partition. We can place this mapping in our new configuration
>>>>> object. Of
>>>>> course, there is no way for KStreams to verify in compile time/startup
>>>>> time that the this function is actually the reverse mapping that
>>>>> extract
>>>>> the old key from the new one. Users will forget to provide this
>>>>> function. Users will provide wrong functions. This all looks too
>>>>> fragile.
>>>>>
>>>>> 3) Maybe there can be a completely different approach. Let's
>>>>> introduce a
>>>>> new entity -- composite keys, consisting of "head" and "tail". The
>>>>> partition for the composite key is calculated based on its 'head' value
>>>>> only. If we provide a key mapping in form key -> CompositeKey(key,
>>>>> tail), then it's obvious that we do not need a repartition. When an
>>>>> interactive query needs to guess the partition for CompositeKey, it
>>>>> just
>>>>> extracts its head and calculates the correct partition.
>>>>>
>>>>> We can select CompositeKey before groupByKey() and aggregation
>>>>> operations, and this will not involve repartition. And IQ will work.
>>>>>
>>>>> Is it too daring idea, WDYT? My concern: will it cover all the cases
>>>>> when we want to choose a different key, but also avoid repartition?
>>>>>
>>>>> Regards,
>>>>>
>>>>> Ivan
>>>>>
>>>>>
>>>>>
>>>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет:
>>>>>> Hey Ivan
>>>>>>
>>>>>> I completely agree that adding it as a config to Grouped/Joined/etc
>>>>>> isn't
>>>>>> much better, I was just
>>>>>> listing it for completeness, and that I would prefer to make it a
>>>>>> configuration of the key-changing
>>>>>> operation itself -- that's what I meant by
>>>>>>
>>>>>> a better alternative might be to introduce this ... to the config
>>>>>> object
>>>>> of
>>>>>>> the operator that's actually
>>>>>>
>>>>>> doing the key changing operation
>>>>>>
>>>>>>
>>>>>> I personally believe this is the semantically "correct" way to
>>>>>> approach
>>>>>> this, since "preserves partitioning"
>>>>>> or "does not preserve partitioning" is a property of a key-changing
>>>>>> operation and not an operation on the
>>>>>> stream itself. Also, this way the user need only tell Streams which
>>>>>> operations do or do not preserve the
>>>>>> partitioning, and Streams can figure out where to insert a
>>>>>> repartition in
>>>>>> the topology as it does today.
>>>>>>
>>>>>> Otherwise, we're rendering this particularly useful feature of the
>>>>>> DSL --
>>>>>> automatic repartitioning -- pretty
>>>>>> much useless, since the user now has to figure out whether a
>>>>>> repartition
>>>>> is
>>>>>> needed. On top of that, they
>>>>>> need to have some understanding of where and when this internal
>>>>>> automatic
>>>>>> repartitioning logic is going
>>>>>> to insert that repartition in order to cancel it in the appropriate
>>>>> place.
>>>>>> Which is pretty unfortunate, since
>>>>>> that logic is not part of the public contract: it can change at any
>>>>>> time,
>>>>>> for example as it did when we introduced
>>>>>> the repartition merging optimization.
>>>>>>
>>>>>> All that said, those are valid concerns regarding the expansion of the
>>>>>> API's surface area. Since none of
>>>>>> the key-changing operations currently have a config object like some
>>>>> other
>>>>>> operations (for example Grouped
>>>>>> or Consumed, etc), this would double the number of overloads. But
>>>>>> maybe
>>>>>> this is a good opportunity to fix
>>>>>> that problem, rather than keep digging ourselves into holes by
>>>>>> trying to
>>>>>> work around it.
>>>>>>
>>>>>> It looks like all of those key-changing operations have two
>>>>>> overloads at
>>>>>> the moment, one with no parameters
>>>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the
>>>>>> other with an additional Named
>>>>>> parameter, which is itself another kind of configuration. What if we
>>>>>> instead deprecate the existing overloads
>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>> operator-specific config object like we do
>>>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named
>>>>>> and
>>>>>> this  `markAsPartitioned` flag
>>>>>> be part of the general config object, which (a) does not expand the
>>>>>> API
>>>>>> surface area at all in this KIP, and (b)
>>>>>> also protects future KIPs from needing to have this same conversation
>>>>> over
>>>>>> and over, because we can now
>>>>>> stick any additional operator properties into that same config object.
>>>>>>
>>>>>> WDYT? By the way, the above idea (introducing a single config
>>>>>> object to
>>>>>> wrap all operator properties) was also
>>>>>> raised by John Roesler a while back. Let's hope he hasn't changed his
>>>>> mind
>>>>>> since then :)
>>>>>>
>>>>>>
>>>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev
>>>>>> <iponomarev@mail.ru.invalid
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Matthias,
>>>>>>>
>>>>>>> Concerning the naming: I like `markAsPartitioned`, because it
>>>>>>> describes
>>>>>>> what this operation is actually doing!
>>>>>>>
>>>>>>> Hi Sophie,
>>>>>>>
>>>>>>> I see the concern about poor code cohesion. We declare key mapping in
>>>>>>> one place of code, then later in another place we say
>>>>>>> "markAsPartitioned()". When we change the code six months later, we
>>>>>>> might forget to remove markAsPartitioned(), especially if it's
>>>>>>> placed in
>>>>>>> another method or class. But I don't understand why do you propose to
>>>>>>> include this config into Grouped/Joined/StreamJoined, because from
>>>>>>> this
>>>>>>> point of view it's not a better solution?
>>>>>>>
>>>>>>> The best approach regarding the cohesion might be to to add an extra
>>>>>>> 'preservePartition' flag to every key-changing operation, that is
>>>>>>>
>>>>>>> 1) selectKey
>>>>>>> 2) map
>>>>>>> 3) flatMap
>>>>>>> 4) transform
>>>>>>> 5) flatTransform
>>>>>>>
>>>>>>> in order to tell if the provided mapping require repartition or not.
>>>>>>> Indeed, this is a mapping operation property, not grouping one!
>>>>>>> BTW: the
>>>>>>> idea of adding extra parameter to `selectKey` was once coined by John
>>>>>>> Roesler.
>>>>>>>
>>>>>>> Arguments in favour for this approach: 1) better code cohesion
>>>>>>> from the
>>>>>>> point of view of the user, 2) 'smarter' code (the decision is taken
>>>>>>> depending on metadata provided for all the upstream mappings), 3)
>>>>>>> overall safer for the user.
>>>>>>>
>>>>>>> Arguments against: invasive KStreams API change, 5 more method
>>>>>>> overloads. Further on, when we add a new key-changing operation to
>>>>>>> KStream, we must add an overloaded version with 'preservePartition'.
>>>>>>> When we add a new overloaded version for existing operation, we
>>>>>>> actually
>>>>>>> might need to add two or more overloaded versions. This will soon
>>>>>>> become
>>>>>>> a mess.
>>>>>>>
>>>>>>> I thought that since `markAsPartitioned` is intended for advanced
>>>>>>> users,
>>>>>>> they will use it with care. When you're in a position where every
>>>>>>> serialization/deserialization round matters for the latency, you're
>>>>>>> extremely careful with the topology and you will not thoughtlessly
>>>>>>> add
>>>>>>> new key-changing operations without controlling how it's going to
>>>>>>> change
>>>>>>> the overall topology.
>>>>>>>
>>>>>>> By the way, if we later find a better solution, it's way more easy to
>>>>>>> deprecate a single `markAsPartitioned` operation than 5 method
>>>>> overloads.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет:
>>>>>>>> Do we really need a whole DSL operator for this? I think the
>>>>>>>> original
>>>>>>> name
>>>>>>>> for this
>>>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this
>>>>>>>> is not
>>>>> an
>>>>>>>> operation on the
>>>>>>>> stream itself but rather a command/request to whichever operator
>>>>>>>> would
>>>>>>> have
>>>>>>>> otherwise triggered this repartition.
>>>>>>>>
>>>>>>>> What about instead adding a new field to the
>>>>> Grouped/Joined/StreamJoined
>>>>>>>> config
>>>>>>>> objects that signals them to skip the repartitioning?
>>>>>>>>
>>>>>>>> The one downside to this specific proposal is that you would then
>>>>>>>> need
>>>>> to
>>>>>>>> specify
>>>>>>>> this for every stateful operation downstream of the key-changing
>>>>>>> operation.
>>>>>>>> So a
>>>>>>>> better alternative might be to introduce this `skipRepartition`
>>>>>>>> field,
>>>>> or
>>>>>>>> whatever we
>>>>>>>> want to call it, to the config object of the operator that's
>>>>>>>> actually
>>>>>>> doing
>>>>>>>> the key
>>>>>>>> changing operation which is apparently preserving the partitioning.
>>>>>>>>
>>>>>>>> Imo this would be more "safe" relative to the current proposal,
>>>>>>>> as the
>>>>>>> user
>>>>>>>> has to
>>>>>>>> explicitly consider whether every key changing operation is indeed
>>>>>>>> preserving the
>>>>>>>> partitioning. Otherwise you could code up a topology with several
>>>>>>>> key
>>>>>>>> changing
>>>>>>>> operations at the beginning which do require repartitioning. Then
>>>>>>>> you
>>>>> get
>>>>>>>> to the end
>>>>>>>> of the topology and insert one final key changing operation that
>>>>> doesn't,
>>>>>>>> assume
>>>>>>>> you can just cancel the repartition, and suddenly you're
>>>>>>>> wondering why
>>>>>>> your
>>>>>>>> results
>>>>>>>> are all screwed up
>>>>>>>>
>>>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the KIP Ivan!
>>>>>>>>>
>>>>>>>>> I think it's a good feature to give advanced users more control,
>>>>>>>>> and
>>>>>>>>> allow them to build more efficient application.
>>>>>>>>>
>>>>>>>>> Not sure if I like the proposed named though (the good old "naming
>>>>>>>>> things" discussion :))
>>>>>>>>>
>>>>>>>>> Did you consider alternatives? What about
>>>>>>>>>
>>>>>>>>>      - markAsPartitioned()
>>>>>>>>>      - markAsKeyed()
>>>>>>>>>      - skipRepartition()
>>>>>>>>>
>>>>>>>>> Not sure if there are other idea on a good name?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>> I'd like to start a discussion for KIP-759:
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct`
>>>>>>>>>> operator, which turned out to be a separate proposal.
>>>>>>>>>>
>>>>>>>>>> The proposal is quite trivial, however, we still might consider
>>>>>>>>>> alternatives (see 'Possible Alternatives' section).
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Ivan
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

Posted by Levani Kokhreidze <le...@gmail.com>.
Hi all,

Since there was no activity around this KIP, I’ll pick it up in coming weeks and continue the discussion.

Best,
Levani

> On 27. Apr 2022, at 22:50, Matthias J. Sax <mj...@apache.org> wrote:
> 
> Let's wait a couple of days to give Ivan a chance to reply. If he does not reply, feel free to pick it up.
> 
> 
> -Matthias
> 
> On 4/26/22 3:58 AM, Levani Kokhreidze wrote:
>> Hi,
>> Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes dormant, I'd be interested in picking it up.
>> Levani
>>> On 23. Apr 2022, at 02:43, Matthias J. Sax <mj...@apache.org> wrote:
>>> 
>>> Ivan,
>>> 
>>> are you still interested in this KIP? I think it would be a good addition.
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 8/16/21 5:30 PM, Matthias J. Sax wrote:
>>>> Your point about the IQ problem is an interesting one. I missed the
>>>> point that the "new key" would be a "superkey", and thus, it should
>>>> always be possible to compute the original key from the superkey. (As a
>>>> matter of fact, for windowed-table the windowed-key is also a superkey...)
>>>> I am not sure if we need to follow the "use the head idea" or if we need
>>>> a "CompositeKey" interface? It seems we can just allow for any types and
>>>> we can be agnostic to it?
>>>> KStream<K, V> stream = ...
>>>> KStream<SK, V> stream2 =
>>>>   stream.selectKey(/*set superkey*/)
>>>>         .markAsPartitioned()
>>>> We only need a `Function<SK, K>` without any restrictions on the type,
>>>> to map the "superkey" to the original "partition key"?
>>>> Do you propose to provide the "revers mapper" via the
>>>> `markAsPartitioned()` method (or config object), or via the IQ methods?
>>>> Not sure which one is better?
>>>> However, I am not sure if it would solve the join problem? At least not
>>>> easily: if one has two KStream<Tuple,...> and one is properly
>>>> partitioned by `Tuple` while the other one is "marked-as-partitoned",
>>>> the join would just fail. -- Similar for a stream-table join. -- The
>>>> only fix would be to do the re-partitioning anyway, effectively ignoring
>>>> the "user hint", but it seems to defeat the purpose? Again, I would
>>>> argue that it is ok to not handle this case, but leave it as the
>>>> responsibility for the user to not mess it up.
>>>> -Matthias
>>>> On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
>>>>> Hi Matthias and Sophie!
>>>>> 
>>>>> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
>>>>> 
>>>>> I don't have a strong opinion here, both Sophie's and Matthias' points
>>>>> look convincing for me.
>>>>> 
>>>>> I think we should estimate the following: what is the probability that
>>>>> we will ever need to extend `selectKey` etc. with a config for the
>>>>> purposes other than `markAsPartitioned`?
>>>>> 
>>>>> If we find this probability high, then it's just a refactoring to
>>>>> deprecate overloads with `Named` and introduce overloads with dedicated
>>>>> configs, and we should do it this way.
>>>>> 
>>>>> If it's low or zero, maybe it's better not to mess with the existing
>>>>> APIs and to introduce a single `markAsPartitioned()` method, which
>>>>> itself can be easily deprecated if we find a better solution later!
>>>>> 
>>>>> 
>>>>> ==2. The IQ problem==
>>>>> 
>>>>>> it then has to be the case that
>>>>> 
>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>> 
>>>>> 
>>>>> Sophie, you got this wrong, and Matthias already explained why.
>>>>> 
>>>>> The actual required property for the mapping function is:
>>>>> 
>>>>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
>>>>> 
>>>>> or, by contraposition law,
>>>>> 
>>>>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
>>>>> 
>>>>> 
>>>>> (look at the whiteboard photo that I attached to the KIP).
>>>>> 
>>>>> There is a big class of such mappings: key -> Tuple(key, anyValue). This
>>>>> is actually what we often do before aggregation, and this mapping does
>>>>> not require repartition.
>>>>> 
>>>>> But of course we can extract the original key from Tuple(key, anyValue),
>>>>> and this can save IQ and joins!
>>>>> 
>>>>> This is what I'm talking about when I talk about 'CompositeKey' idea.
>>>>> 
>>>>> We can do the following:
>>>>> 
>>>>> 1. implement a 'partitioner wrapper' that recognizes tuples
>>>>> (CompositeKeys) and uses only the 'head' to calculate the partition,
>>>>> 
>>>>> 2. implement
>>>>> 
>>>>> selectCompositeKey(BiFunction<K, V> tailSelector) {
>>>>>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>>>>>   //MARK_AS_PARTITIONED call here,
>>>>>   //but this call is an implementation detail and we do not expose
>>>>>   //markAsPartitioned publicly!
>>>>> }
>>>>> 
>>>>> WDYT? (it's just a brainstorming idea)
>>>>> 
>>>>> 09.08.2021 2:38, Matthias J. Sax пишет:
>>>>>> Hi,
>>>>>> 
>>>>>> I originally had a similar thought about `markAsPartitioned()` vs
>>>>>> extending `selectKey()` et al. with a config. While I agree that it
>>>>>> might be conceptually cleaner to use a config object, I did not propose
>>>>>> it as the API impact (deprecating stuff and adding new stuff) is quite
>>>>>> big... If we think it's an acceptable price to pay, I am ok with it
>>>>>> though.
>>>>>> 
>>>>>> I also do think, that `markAsPartitioned()` could actually be
>>>>>> categorized as an operator... We don't expose it in the API as
>>>>>> first-class citizen atm, but in fact we have two types of `KStream` -- a
>>>>>> "PartitionedKStream" and a "NonPartitionedKStream". Thus,
>>>>>> `markAsPartitioned()` can be seen as a "cast operator" that converts the
>>>>>> one into the other.
>>>>>> 
>>>>>> I also think that the raised concern about "forgetting to remove
>>>>>> `markAsPartitioned()`" might not be very strong though. If you have
>>>>>> different places in the code that link stuff together, a call to eg.
>>>>>> `selectKey().markAsPartitioned()` must always to together. If you have
>>>>>> some other place in the code that get a `KStream` passed an input, it
>>>>>> would be "invalid" to blindly call `markAsPartitioned()` as you don't
>>>>>> know anything about the upstream code. Of course, it requires some
>>>>>> "coding discipline" to follow this pattern... Also, you can shoot
>>>>>> themselves into the foot if they want with the config object pattern,
>>>>>> too: if you get a `KStream` passed in, you can skip repartitioning via
>>>>>> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
>>>>>> slightly prefer to add `markAsPartitioned()` as an operator.
>>>>>> 
>>>>>> (Maybe we should have expose a `PartitionedKStream` as first class
>>>>>> object to begin with... Hard to introduce now I guess...)
>>>>>> 
>>>>>> 
>>>>>> The concern about IQ is interesting -- I did not realize this impact.
>>>>>> Thanks for bringing it up.
>>>>>> 
>>>>>>> a repartition would be a no-op, ie that the stream (and its
>>>>>>> partitioning)
>>>>>>> would be the same
>>>>>>> whether or not a repartition is inserted. For this to be true, it
>>>>>>> then has
>>>>>>> to be the case that
>>>>>>> 
>>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>>> 
>>>>>> @Sophie: I don't think this statement is correct. A `markAsPartition()`
>>>>>> only means, that the existing partitioning ensure that all messages of
>>>>>> the same new key are still in the same partition. Ie, it cannot happen
>>>>>> that two new keys (that are the same) are in a different partition.
>>>>>> 
>>>>>> However, if you would physically repartitiong on the new key using the
>>>>>> same hash-function as for the old key, there is no guarantee that the
>>>>>> same partitions would be picked... And that is why IQ breaks downstream.
>>>>>> 
>>>>>> Btw: using `markAsPartitioned()` could also be an issue for joins for
>>>>>> the same reason... I want to call out, that the Jira tickets that did
>>>>>> raise the concern about unnecessary repartitioning are about downstream
>>>>>> aggregations though...
>>>>>> 
>>>>>> Last but not least: we actually have a similar situation for
>>>>>> windowed-aggregations: The result of a window aggregation is partitioned
>>>>>> by the "plain key": if we write the result into a topic using the same
>>>>>> partitioning function, we would write to different partitions... (I
>>>>>> guess it was never an issue so far, as we don't have KIP-300 in place
>>>>>> yet...)
>>>>>> 
>>>>>> It's also not an issue for IQ, because we know the plain key, and thus
>>>>>> can route to the right task.
>>>>>> 
>>>>>> 
>>>>>> About a solution: I think it might be ok to say we don't need to solve
>>>>>> this problem, but it's the users responsibility to take IQ into account.
>>>>>> Ie, if they want to use IQ downstream, the need to repartition: for this
>>>>>> case, repartitioning is _NOT_ unnecessary... The same argument seems to
>>>>>> apply for the join case I mentioned above. -- Given that
>>>>>> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to
>>>>>> the user to use correctly (we should of course call it out in the docs!).
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -Matthias
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
>>>>>>> Before I dive in to the question of IQ and the approaches you
>>>>>>> proposed, can
>>>>>>> you just
>>>>>>> elaborate on the problem itself? By definition, the `markAsPartitioned`
>>>>>>> flag means that
>>>>>>> a repartition would be a no-op, ie that the stream (and its
>>>>>>> partitioning)
>>>>>>> would be the same
>>>>>>> whether or not a repartition is inserted. For this to be true, it
>>>>>>> then has
>>>>>>> to be the case that
>>>>>>> 
>>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>>>> 
>>>>>>> The left-hand side of the above is precisely how we determine the
>>>>>>> partition
>>>>>>> number that
>>>>>>> a key belongs to when using IQ. It shouldn't matter whether the user is
>>>>>>> querying a key
>>>>>>> in a store upstream of the key-changing operation or a mapped key
>>>>>>> downstream of it
>>>>>>> -- either way we just apply the given Partitioner.
>>>>>>> 
>>>>>>> See StreamsMetadataState#getKeyQueryMetadataForKey
>>>>>>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>
>>>>>>> 
>>>>>>> for where this happens
>>>>>>> 
>>>>>>> 
>>>>>>> If we're concerned that users might try to abuse the new
>>>>>>> `markAsPartitioned` feature,
>>>>>>> or accidentally misuse it, then we could add a runtime check that
>>>>>>> applies
>>>>>>> the Partitioner
>>>>>>> associated with that subtopology to the key being processed and the
>>>>>>> mapped
>>>>>>> key result
>>>>>>> to assert that they do indeed match. Imo this is probably overkill, just
>>>>>>> putting it out there.
>>>>>>> 
>>>>>>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev
>>>>>>> <ip...@mail.ru.invalid>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Sophie,
>>>>>>>> 
>>>>>>>> thanks for your reply! So your proposal is:
>>>>>>>> 
>>>>>>>> 1). For each key-changing operation, deprecate the existing overloads
>>>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>>>> operator-specific config object.
>>>>>>>> 2). Add `markAsPartitioned` flag to these configs.
>>>>>>>> 
>>>>>>>> IMO, this looks much better than the original proposal, I like it very
>>>>>>>> much and I think I will rewrite the KIP soon. I absolutely agree with
>>>>>>>> your points. Repartition logic is not a part of the public contract,
>>>>>>>> and
>>>>>>>> it's much better to give it correct hints instead of telling explicitly
>>>>>>>> what it should do.
>>>>>>>> 
>>>>>>>> ...
>>>>>>>> 
>>>>>>>> Since we're generating such bright ideas, maybe we should also
>>>>>>>> brainstorm the interactive query problem?
>>>>>>>> 
>>>>>>>> The problem is that interactive queries will not work properly when
>>>>>>>> `markAsPartitioned` is used. Although original key and mapped key will
>>>>>>>> be in the same partition, we will no longer be able to guess this
>>>>>>>> partition given the mapped key only.
>>>>>>>> 
>>>>>>>> The possible approaches are:
>>>>>>>> 
>>>>>>>> 1) Give up and don't use interactive queries together with
>>>>>>>> `markAsPartitioned`. This is what I suppose now. But can we do better?
>>>>>>>> 
>>>>>>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will
>>>>>>>> allow IQ to restore the original key in order to choose the correct
>>>>>>>> partition. We can place this mapping in our new configuration
>>>>>>>> object. Of
>>>>>>>> course, there is no way for KStreams to verify in compile time/startup
>>>>>>>> time that the this function is actually the reverse mapping that
>>>>>>>> extract
>>>>>>>> the old key from the new one. Users will forget to provide this
>>>>>>>> function. Users will provide wrong functions. This all looks too
>>>>>>>> fragile.
>>>>>>>> 
>>>>>>>> 3) Maybe there can be a completely different approach. Let's
>>>>>>>> introduce a
>>>>>>>> new entity -- composite keys, consisting of "head" and "tail". The
>>>>>>>> partition for the composite key is calculated based on its 'head' value
>>>>>>>> only. If we provide a key mapping in form key -> CompositeKey(key,
>>>>>>>> tail), then it's obvious that we do not need a repartition. When an
>>>>>>>> interactive query needs to guess the partition for CompositeKey, it
>>>>>>>> just
>>>>>>>> extracts its head and calculates the correct partition.
>>>>>>>> 
>>>>>>>> We can select CompositeKey before groupByKey() and aggregation
>>>>>>>> operations, and this will not involve repartition. And IQ will work.
>>>>>>>> 
>>>>>>>> Is it too daring idea, WDYT? My concern: will it cover all the cases
>>>>>>>> when we want to choose a different key, but also avoid repartition?
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> 
>>>>>>>> Ivan
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет:
>>>>>>>>> Hey Ivan
>>>>>>>>> 
>>>>>>>>> I completely agree that adding it as a config to Grouped/Joined/etc
>>>>>>>>> isn't
>>>>>>>>> much better, I was just
>>>>>>>>> listing it for completeness, and that I would prefer to make it a
>>>>>>>>> configuration of the key-changing
>>>>>>>>> operation itself -- that's what I meant by
>>>>>>>>> 
>>>>>>>>> a better alternative might be to introduce this ... to the config
>>>>>>>>> object
>>>>>>>> of
>>>>>>>>>> the operator that's actually
>>>>>>>>> 
>>>>>>>>> doing the key changing operation
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I personally believe this is the semantically "correct" way to
>>>>>>>>> approach
>>>>>>>>> this, since "preserves partitioning"
>>>>>>>>> or "does not preserve partitioning" is a property of a key-changing
>>>>>>>>> operation and not an operation on the
>>>>>>>>> stream itself. Also, this way the user need only tell Streams which
>>>>>>>>> operations do or do not preserve the
>>>>>>>>> partitioning, and Streams can figure out where to insert a
>>>>>>>>> repartition in
>>>>>>>>> the topology as it does today.
>>>>>>>>> 
>>>>>>>>> Otherwise, we're rendering this particularly useful feature of the
>>>>>>>>> DSL --
>>>>>>>>> automatic repartitioning -- pretty
>>>>>>>>> much useless, since the user now has to figure out whether a
>>>>>>>>> repartition
>>>>>>>> is
>>>>>>>>> needed. On top of that, they
>>>>>>>>> need to have some understanding of where and when this internal
>>>>>>>>> automatic
>>>>>>>>> repartitioning logic is going
>>>>>>>>> to insert that repartition in order to cancel it in the appropriate
>>>>>>>> place.
>>>>>>>>> Which is pretty unfortunate, since
>>>>>>>>> that logic is not part of the public contract: it can change at any
>>>>>>>>> time,
>>>>>>>>> for example as it did when we introduced
>>>>>>>>> the repartition merging optimization.
>>>>>>>>> 
>>>>>>>>> All that said, those are valid concerns regarding the expansion of the
>>>>>>>>> API's surface area. Since none of
>>>>>>>>> the key-changing operations currently have a config object like some
>>>>>>>> other
>>>>>>>>> operations (for example Grouped
>>>>>>>>> or Consumed, etc), this would double the number of overloads. But
>>>>>>>>> maybe
>>>>>>>>> this is a good opportunity to fix
>>>>>>>>> that problem, rather than keep digging ourselves into holes by
>>>>>>>>> trying to
>>>>>>>>> work around it.
>>>>>>>>> 
>>>>>>>>> It looks like all of those key-changing operations have two
>>>>>>>>> overloads at
>>>>>>>>> the moment, one with no parameters
>>>>>>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the
>>>>>>>>> other with an additional Named
>>>>>>>>> parameter, which is itself another kind of configuration. What if we
>>>>>>>>> instead deprecate the existing overloads
>>>>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>>>>> operator-specific config object like we do
>>>>>>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named
>>>>>>>>> and
>>>>>>>>> this  `markAsPartitioned` flag
>>>>>>>>> be part of the general config object, which (a) does not expand the
>>>>>>>>> API
>>>>>>>>> surface area at all in this KIP, and (b)
>>>>>>>>> also protects future KIPs from needing to have this same conversation
>>>>>>>> over
>>>>>>>>> and over, because we can now
>>>>>>>>> stick any additional operator properties into that same config object.
>>>>>>>>> 
>>>>>>>>> WDYT? By the way, the above idea (introducing a single config
>>>>>>>>> object to
>>>>>>>>> wrap all operator properties) was also
>>>>>>>>> raised by John Roesler a while back. Let's hope he hasn't changed his
>>>>>>>> mind
>>>>>>>>> since then :)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev
>>>>>>>>> <iponomarev@mail.ru.invalid
>>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Matthias,
>>>>>>>>>> 
>>>>>>>>>> Concerning the naming: I like `markAsPartitioned`, because it
>>>>>>>>>> describes
>>>>>>>>>> what this operation is actually doing!
>>>>>>>>>> 
>>>>>>>>>> Hi Sophie,
>>>>>>>>>> 
>>>>>>>>>> I see the concern about poor code cohesion. We declare key mapping in
>>>>>>>>>> one place of code, then later in another place we say
>>>>>>>>>> "markAsPartitioned()". When we change the code six months later, we
>>>>>>>>>> might forget to remove markAsPartitioned(), especially if it's
>>>>>>>>>> placed in
>>>>>>>>>> another method or class. But I don't understand why do you propose to
>>>>>>>>>> include this config into Grouped/Joined/StreamJoined, because from
>>>>>>>>>> this
>>>>>>>>>> point of view it's not a better solution?
>>>>>>>>>> 
>>>>>>>>>> The best approach regarding the cohesion might be to to add an extra
>>>>>>>>>> 'preservePartition' flag to every key-changing operation, that is
>>>>>>>>>> 
>>>>>>>>>> 1) selectKey
>>>>>>>>>> 2) map
>>>>>>>>>> 3) flatMap
>>>>>>>>>> 4) transform
>>>>>>>>>> 5) flatTransform
>>>>>>>>>> 
>>>>>>>>>> in order to tell if the provided mapping require repartition or not.
>>>>>>>>>> Indeed, this is a mapping operation property, not grouping one!
>>>>>>>>>> BTW: the
>>>>>>>>>> idea of adding extra parameter to `selectKey` was once coined by John
>>>>>>>>>> Roesler.
>>>>>>>>>> 
>>>>>>>>>> Arguments in favour for this approach: 1) better code cohesion
>>>>>>>>>> from the
>>>>>>>>>> point of view of the user, 2) 'smarter' code (the decision is taken
>>>>>>>>>> depending on metadata provided for all the upstream mappings), 3)
>>>>>>>>>> overall safer for the user.
>>>>>>>>>> 
>>>>>>>>>> Arguments against: invasive KStreams API change, 5 more method
>>>>>>>>>> overloads. Further on, when we add a new key-changing operation to
>>>>>>>>>> KStream, we must add an overloaded version with 'preservePartition'.
>>>>>>>>>> When we add a new overloaded version for existing operation, we
>>>>>>>>>> actually
>>>>>>>>>> might need to add two or more overloaded versions. This will soon
>>>>>>>>>> become
>>>>>>>>>> a mess.
>>>>>>>>>> 
>>>>>>>>>> I thought that since `markAsPartitioned` is intended for advanced
>>>>>>>>>> users,
>>>>>>>>>> they will use it with care. When you're in a position where every
>>>>>>>>>> serialization/deserialization round matters for the latency, you're
>>>>>>>>>> extremely careful with the topology and you will not thoughtlessly
>>>>>>>>>> add
>>>>>>>>>> new key-changing operations without controlling how it's going to
>>>>>>>>>> change
>>>>>>>>>> the overall topology.
>>>>>>>>>> 
>>>>>>>>>> By the way, if we later find a better solution, it's way more easy to
>>>>>>>>>> deprecate a single `markAsPartitioned` operation than 5 method
>>>>>>>> overloads.
>>>>>>>>>> 
>>>>>>>>>> What do you think?
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет:
>>>>>>>>>>> Do we really need a whole DSL operator for this? I think the
>>>>>>>>>>> original
>>>>>>>>>> name
>>>>>>>>>>> for this
>>>>>>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this
>>>>>>>>>>> is not
>>>>>>>> an
>>>>>>>>>>> operation on the
>>>>>>>>>>> stream itself but rather a command/request to whichever operator
>>>>>>>>>>> would
>>>>>>>>>> have
>>>>>>>>>>> otherwise triggered this repartition.
>>>>>>>>>>> 
>>>>>>>>>>> What about instead adding a new field to the
>>>>>>>> Grouped/Joined/StreamJoined
>>>>>>>>>>> config
>>>>>>>>>>> objects that signals them to skip the repartitioning?
>>>>>>>>>>> 
>>>>>>>>>>> The one downside to this specific proposal is that you would then
>>>>>>>>>>> need
>>>>>>>> to
>>>>>>>>>>> specify
>>>>>>>>>>> this for every stateful operation downstream of the key-changing
>>>>>>>>>> operation.
>>>>>>>>>>> So a
>>>>>>>>>>> better alternative might be to introduce this `skipRepartition`
>>>>>>>>>>> field,
>>>>>>>> or
>>>>>>>>>>> whatever we
>>>>>>>>>>> want to call it, to the config object of the operator that's
>>>>>>>>>>> actually
>>>>>>>>>> doing
>>>>>>>>>>> the key
>>>>>>>>>>> changing operation which is apparently preserving the partitioning.
>>>>>>>>>>> 
>>>>>>>>>>> Imo this would be more "safe" relative to the current proposal,
>>>>>>>>>>> as the
>>>>>>>>>> user
>>>>>>>>>>> has to
>>>>>>>>>>> explicitly consider whether every key changing operation is indeed
>>>>>>>>>>> preserving the
>>>>>>>>>>> partitioning. Otherwise you could code up a topology with several
>>>>>>>>>>> key
>>>>>>>>>>> changing
>>>>>>>>>>> operations at the beginning which do require repartitioning. Then
>>>>>>>>>>> you
>>>>>>>> get
>>>>>>>>>>> to the end
>>>>>>>>>>> of the topology and insert one final key changing operation that
>>>>>>>> doesn't,
>>>>>>>>>>> assume
>>>>>>>>>>> you can just cancel the repartition, and suddenly you're
>>>>>>>>>>> wondering why
>>>>>>>>>> your
>>>>>>>>>>> results
>>>>>>>>>>> are all screwed up
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for the KIP Ivan!
>>>>>>>>>>>> 
>>>>>>>>>>>> I think it's a good feature to give advanced users more control,
>>>>>>>>>>>> and
>>>>>>>>>>>> allow them to build more efficient application.
>>>>>>>>>>>> 
>>>>>>>>>>>> Not sure if I like the proposed named though (the good old "naming
>>>>>>>>>>>> things" discussion :))
>>>>>>>>>>>> 
>>>>>>>>>>>> Did you consider alternatives? What about
>>>>>>>>>>>> 
>>>>>>>>>>>>     - markAsPartitioned()
>>>>>>>>>>>>     - markAsKeyed()
>>>>>>>>>>>>     - skipRepartition()
>>>>>>>>>>>> 
>>>>>>>>>>>> Not sure if there are other idea on a good name?
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>> 
>>>>>>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I'd like to start a discussion for KIP-759:
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct`
>>>>>>>>>>>>> operator, which turned out to be a separate proposal.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The proposal is quite trivial, however, we still might consider
>>>>>>>>>>>>> alternatives (see 'Possible Alternatives' section).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Ivan
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 


Re: [DISCUSS] KIP-759: Unneeded repartition canceling

Posted by "Matthias J. Sax" <mj...@apache.org>.
Let's wait a couple of days to give Ivan a chance to reply. If he does 
not reply, feel free to pick it up.


-Matthias

On 4/26/22 3:58 AM, Levani Kokhreidze wrote:
> Hi,
> 
> Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes dormant, I'd be interested in picking it up.
> 
> Levani
> 
>> On 23. Apr 2022, at 02:43, Matthias J. Sax <mj...@apache.org> wrote:
>>
>> Ivan,
>>
>> are you still interested in this KIP? I think it would be a good addition.
>>
>>
>> -Matthias
>>
>> On 8/16/21 5:30 PM, Matthias J. Sax wrote:
>>> Your point about the IQ problem is an interesting one. I missed the
>>> point that the "new key" would be a "superkey", and thus, it should
>>> always be possible to compute the original key from the superkey. (As a
>>> matter of fact, for windowed-table the windowed-key is also a superkey...)
>>> I am not sure if we need to follow the "use the head idea" or if we need
>>> a "CompositeKey" interface? It seems we can just allow for any types and
>>> we can be agnostic to it?
>>> KStream<K, V> stream = ...
>>> KStream<SK, V> stream2 =
>>>    stream.selectKey(/*set superkey*/)
>>>          .markAsPartitioned()
>>> We only need a `Function<SK, K>` without any restrictions on the type,
>>> to map the "superkey" to the original "partition key"?
>>> Do you propose to provide the "revers mapper" via the
>>> `markAsPartitioned()` method (or config object), or via the IQ methods?
>>> Not sure which one is better?
>>> However, I am not sure if it would solve the join problem? At least not
>>> easily: if one has two KStream<Tuple,...> and one is properly
>>> partitioned by `Tuple` while the other one is "marked-as-partitoned",
>>> the join would just fail. -- Similar for a stream-table join. -- The
>>> only fix would be to do the re-partitioning anyway, effectively ignoring
>>> the "user hint", but it seems to defeat the purpose? Again, I would
>>> argue that it is ok to not handle this case, but leave it as the
>>> responsibility for the user to not mess it up.
>>> -Matthias
>>> On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
>>>> Hi Matthias and Sophie!
>>>>
>>>> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
>>>>
>>>> I don't have a strong opinion here, both Sophie's and Matthias' points
>>>> look convincing for me.
>>>>
>>>> I think we should estimate the following: what is the probability that
>>>> we will ever need to extend `selectKey` etc. with a config for the
>>>> purposes other than `markAsPartitioned`?
>>>>
>>>> If we find this probability high, then it's just a refactoring to
>>>> deprecate overloads with `Named` and introduce overloads with dedicated
>>>> configs, and we should do it this way.
>>>>
>>>> If it's low or zero, maybe it's better not to mess with the existing
>>>> APIs and to introduce a single `markAsPartitioned()` method, which
>>>> itself can be easily deprecated if we find a better solution later!
>>>>
>>>>
>>>> ==2. The IQ problem==
>>>>
>>>>> it then has to be the case that
>>>>
>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>
>>>>
>>>> Sophie, you got this wrong, and Matthias already explained why.
>>>>
>>>> The actual required property for the mapping function is:
>>>>
>>>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
>>>>
>>>> or, by contraposition law,
>>>>
>>>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
>>>>
>>>>
>>>> (look at the whiteboard photo that I attached to the KIP).
>>>>
>>>> There is a big class of such mappings: key -> Tuple(key, anyValue). This
>>>> is actually what we often do before aggregation, and this mapping does
>>>> not require repartition.
>>>>
>>>> But of course we can extract the original key from Tuple(key, anyValue),
>>>> and this can save IQ and joins!
>>>>
>>>> This is what I'm talking about when I talk about 'CompositeKey' idea.
>>>>
>>>> We can do the following:
>>>>
>>>> 1. implement a 'partitioner wrapper' that recognizes tuples
>>>> (CompositeKeys) and uses only the 'head' to calculate the partition,
>>>>
>>>> 2. implement
>>>>
>>>> selectCompositeKey(BiFunction<K, V> tailSelector) {
>>>>    selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>>>>    //MARK_AS_PARTITIONED call here,
>>>>    //but this call is an implementation detail and we do not expose
>>>>    //markAsPartitioned publicly!
>>>> }
>>>>
>>>> WDYT? (it's just a brainstorming idea)
>>>>
>>>> 09.08.2021 2:38, Matthias J. Sax пишет:
>>>>> Hi,
>>>>>
>>>>> I originally had a similar thought about `markAsPartitioned()` vs
>>>>> extending `selectKey()` et al. with a config. While I agree that it
>>>>> might be conceptually cleaner to use a config object, I did not propose
>>>>> it as the API impact (deprecating stuff and adding new stuff) is quite
>>>>> big... If we think it's an acceptable price to pay, I am ok with it
>>>>> though.
>>>>>
>>>>> I also do think, that `markAsPartitioned()` could actually be
>>>>> categorized as an operator... We don't expose it in the API as
>>>>> first-class citizen atm, but in fact we have two types of `KStream` -- a
>>>>> "PartitionedKStream" and a "NonPartitionedKStream". Thus,
>>>>> `markAsPartitioned()` can be seen as a "cast operator" that converts the
>>>>> one into the other.
>>>>>
>>>>> I also think that the raised concern about "forgetting to remove
>>>>> `markAsPartitioned()`" might not be very strong though. If you have
>>>>> different places in the code that link stuff together, a call to eg.
>>>>> `selectKey().markAsPartitioned()` must always to together. If you have
>>>>> some other place in the code that get a `KStream` passed an input, it
>>>>> would be "invalid" to blindly call `markAsPartitioned()` as you don't
>>>>> know anything about the upstream code. Of course, it requires some
>>>>> "coding discipline" to follow this pattern... Also, you can shoot
>>>>> themselves into the foot if they want with the config object pattern,
>>>>> too: if you get a `KStream` passed in, you can skip repartitioning via
>>>>> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
>>>>> slightly prefer to add `markAsPartitioned()` as an operator.
>>>>>
>>>>> (Maybe we should have expose a `PartitionedKStream` as first class
>>>>> object to begin with... Hard to introduce now I guess...)
>>>>>
>>>>>
>>>>> The concern about IQ is interesting -- I did not realize this impact.
>>>>> Thanks for bringing it up.
>>>>>
>>>>>> a repartition would be a no-op, ie that the stream (and its
>>>>>> partitioning)
>>>>>> would be the same
>>>>>> whether or not a repartition is inserted. For this to be true, it
>>>>>> then has
>>>>>> to be the case that
>>>>>>
>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>>
>>>>> @Sophie: I don't think this statement is correct. A `markAsPartition()`
>>>>> only means, that the existing partitioning ensure that all messages of
>>>>> the same new key are still in the same partition. Ie, it cannot happen
>>>>> that two new keys (that are the same) are in a different partition.
>>>>>
>>>>> However, if you would physically repartitiong on the new key using the
>>>>> same hash-function as for the old key, there is no guarantee that the
>>>>> same partitions would be picked... And that is why IQ breaks downstream.
>>>>>
>>>>> Btw: using `markAsPartitioned()` could also be an issue for joins for
>>>>> the same reason... I want to call out, that the Jira tickets that did
>>>>> raise the concern about unnecessary repartitioning are about downstream
>>>>> aggregations though...
>>>>>
>>>>> Last but not least: we actually have a similar situation for
>>>>> windowed-aggregations: The result of a window aggregation is partitioned
>>>>> by the "plain key": if we write the result into a topic using the same
>>>>> partitioning function, we would write to different partitions... (I
>>>>> guess it was never an issue so far, as we don't have KIP-300 in place
>>>>> yet...)
>>>>>
>>>>> It's also not an issue for IQ, because we know the plain key, and thus
>>>>> can route to the right task.
>>>>>
>>>>>
>>>>> About a solution: I think it might be ok to say we don't need to solve
>>>>> this problem, but it's the users responsibility to take IQ into account.
>>>>> Ie, if they want to use IQ downstream, the need to repartition: for this
>>>>> case, repartitioning is _NOT_ unnecessary... The same argument seems to
>>>>> apply for the join case I mentioned above. -- Given that
>>>>> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to
>>>>> the user to use correctly (we should of course call it out in the docs!).
>>>>>
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
>>>>>> Before I dive in to the question of IQ and the approaches you
>>>>>> proposed, can
>>>>>> you just
>>>>>> elaborate on the problem itself? By definition, the `markAsPartitioned`
>>>>>> flag means that
>>>>>> a repartition would be a no-op, ie that the stream (and its
>>>>>> partitioning)
>>>>>> would be the same
>>>>>> whether or not a repartition is inserted. For this to be true, it
>>>>>> then has
>>>>>> to be the case that
>>>>>>
>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>>>
>>>>>> The left-hand side of the above is precisely how we determine the
>>>>>> partition
>>>>>> number that
>>>>>> a key belongs to when using IQ. It shouldn't matter whether the user is
>>>>>> querying a key
>>>>>> in a store upstream of the key-changing operation or a mapped key
>>>>>> downstream of it
>>>>>> -- either way we just apply the given Partitioner.
>>>>>>
>>>>>> See StreamsMetadataState#getKeyQueryMetadataForKey
>>>>>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>
>>>>>>
>>>>>> for where this happens
>>>>>>
>>>>>>
>>>>>> If we're concerned that users might try to abuse the new
>>>>>> `markAsPartitioned` feature,
>>>>>> or accidentally misuse it, then we could add a runtime check that
>>>>>> applies
>>>>>> the Partitioner
>>>>>> associated with that subtopology to the key being processed and the
>>>>>> mapped
>>>>>> key result
>>>>>> to assert that they do indeed match. Imo this is probably overkill, just
>>>>>> putting it out there.
>>>>>>
>>>>>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev
>>>>>> <ip...@mail.ru.invalid>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Sophie,
>>>>>>>
>>>>>>> thanks for your reply! So your proposal is:
>>>>>>>
>>>>>>> 1). For each key-changing operation, deprecate the existing overloads
>>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>>> operator-specific config object.
>>>>>>> 2). Add `markAsPartitioned` flag to these configs.
>>>>>>>
>>>>>>> IMO, this looks much better than the original proposal, I like it very
>>>>>>> much and I think I will rewrite the KIP soon. I absolutely agree with
>>>>>>> your points. Repartition logic is not a part of the public contract,
>>>>>>> and
>>>>>>> it's much better to give it correct hints instead of telling explicitly
>>>>>>> what it should do.
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> Since we're generating such bright ideas, maybe we should also
>>>>>>> brainstorm the interactive query problem?
>>>>>>>
>>>>>>> The problem is that interactive queries will not work properly when
>>>>>>> `markAsPartitioned` is used. Although original key and mapped key will
>>>>>>> be in the same partition, we will no longer be able to guess this
>>>>>>> partition given the mapped key only.
>>>>>>>
>>>>>>> The possible approaches are:
>>>>>>>
>>>>>>> 1) Give up and don't use interactive queries together with
>>>>>>> `markAsPartitioned`. This is what I suppose now. But can we do better?
>>>>>>>
>>>>>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will
>>>>>>> allow IQ to restore the original key in order to choose the correct
>>>>>>> partition. We can place this mapping in our new configuration
>>>>>>> object. Of
>>>>>>> course, there is no way for KStreams to verify in compile time/startup
>>>>>>> time that the this function is actually the reverse mapping that
>>>>>>> extract
>>>>>>> the old key from the new one. Users will forget to provide this
>>>>>>> function. Users will provide wrong functions. This all looks too
>>>>>>> fragile.
>>>>>>>
>>>>>>> 3) Maybe there can be a completely different approach. Let's
>>>>>>> introduce a
>>>>>>> new entity -- composite keys, consisting of "head" and "tail". The
>>>>>>> partition for the composite key is calculated based on its 'head' value
>>>>>>> only. If we provide a key mapping in form key -> CompositeKey(key,
>>>>>>> tail), then it's obvious that we do not need a repartition. When an
>>>>>>> interactive query needs to guess the partition for CompositeKey, it
>>>>>>> just
>>>>>>> extracts its head and calculates the correct partition.
>>>>>>>
>>>>>>> We can select CompositeKey before groupByKey() and aggregation
>>>>>>> operations, and this will not involve repartition. And IQ will work.
>>>>>>>
>>>>>>> Is it too daring idea, WDYT? My concern: will it cover all the cases
>>>>>>> when we want to choose a different key, but also avoid repartition?
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Ivan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет:
>>>>>>>> Hey Ivan
>>>>>>>>
>>>>>>>> I completely agree that adding it as a config to Grouped/Joined/etc
>>>>>>>> isn't
>>>>>>>> much better, I was just
>>>>>>>> listing it for completeness, and that I would prefer to make it a
>>>>>>>> configuration of the key-changing
>>>>>>>> operation itself -- that's what I meant by
>>>>>>>>
>>>>>>>> a better alternative might be to introduce this ... to the config
>>>>>>>> object
>>>>>>> of
>>>>>>>>> the operator that's actually
>>>>>>>>
>>>>>>>> doing the key changing operation
>>>>>>>>
>>>>>>>>
>>>>>>>> I personally believe this is the semantically "correct" way to
>>>>>>>> approach
>>>>>>>> this, since "preserves partitioning"
>>>>>>>> or "does not preserve partitioning" is a property of a key-changing
>>>>>>>> operation and not an operation on the
>>>>>>>> stream itself. Also, this way the user need only tell Streams which
>>>>>>>> operations do or do not preserve the
>>>>>>>> partitioning, and Streams can figure out where to insert a
>>>>>>>> repartition in
>>>>>>>> the topology as it does today.
>>>>>>>>
>>>>>>>> Otherwise, we're rendering this particularly useful feature of the
>>>>>>>> DSL --
>>>>>>>> automatic repartitioning -- pretty
>>>>>>>> much useless, since the user now has to figure out whether a
>>>>>>>> repartition
>>>>>>> is
>>>>>>>> needed. On top of that, they
>>>>>>>> need to have some understanding of where and when this internal
>>>>>>>> automatic
>>>>>>>> repartitioning logic is going
>>>>>>>> to insert that repartition in order to cancel it in the appropriate
>>>>>>> place.
>>>>>>>> Which is pretty unfortunate, since
>>>>>>>> that logic is not part of the public contract: it can change at any
>>>>>>>> time,
>>>>>>>> for example as it did when we introduced
>>>>>>>> the repartition merging optimization.
>>>>>>>>
>>>>>>>> All that said, those are valid concerns regarding the expansion of the
>>>>>>>> API's surface area. Since none of
>>>>>>>> the key-changing operations currently have a config object like some
>>>>>>> other
>>>>>>>> operations (for example Grouped
>>>>>>>> or Consumed, etc), this would double the number of overloads. But
>>>>>>>> maybe
>>>>>>>> this is a good opportunity to fix
>>>>>>>> that problem, rather than keep digging ourselves into holes by
>>>>>>>> trying to
>>>>>>>> work around it.
>>>>>>>>
>>>>>>>> It looks like all of those key-changing operations have two
>>>>>>>> overloads at
>>>>>>>> the moment, one with no parameters
>>>>>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the
>>>>>>>> other with an additional Named
>>>>>>>> parameter, which is itself another kind of configuration. What if we
>>>>>>>> instead deprecate the existing overloads
>>>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>>>> operator-specific config object like we do
>>>>>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named
>>>>>>>> and
>>>>>>>> this  `markAsPartitioned` flag
>>>>>>>> be part of the general config object, which (a) does not expand the
>>>>>>>> API
>>>>>>>> surface area at all in this KIP, and (b)
>>>>>>>> also protects future KIPs from needing to have this same conversation
>>>>>>> over
>>>>>>>> and over, because we can now
>>>>>>>> stick any additional operator properties into that same config object.
>>>>>>>>
>>>>>>>> WDYT? By the way, the above idea (introducing a single config
>>>>>>>> object to
>>>>>>>> wrap all operator properties) was also
>>>>>>>> raised by John Roesler a while back. Let's hope he hasn't changed his
>>>>>>> mind
>>>>>>>> since then :)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev
>>>>>>>> <iponomarev@mail.ru.invalid
>>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Matthias,
>>>>>>>>>
>>>>>>>>> Concerning the naming: I like `markAsPartitioned`, because it
>>>>>>>>> describes
>>>>>>>>> what this operation is actually doing!
>>>>>>>>>
>>>>>>>>> Hi Sophie,
>>>>>>>>>
>>>>>>>>> I see the concern about poor code cohesion. We declare key mapping in
>>>>>>>>> one place of code, then later in another place we say
>>>>>>>>> "markAsPartitioned()". When we change the code six months later, we
>>>>>>>>> might forget to remove markAsPartitioned(), especially if it's
>>>>>>>>> placed in
>>>>>>>>> another method or class. But I don't understand why do you propose to
>>>>>>>>> include this config into Grouped/Joined/StreamJoined, because from
>>>>>>>>> this
>>>>>>>>> point of view it's not a better solution?
>>>>>>>>>
>>>>>>>>> The best approach regarding the cohesion might be to to add an extra
>>>>>>>>> 'preservePartition' flag to every key-changing operation, that is
>>>>>>>>>
>>>>>>>>> 1) selectKey
>>>>>>>>> 2) map
>>>>>>>>> 3) flatMap
>>>>>>>>> 4) transform
>>>>>>>>> 5) flatTransform
>>>>>>>>>
>>>>>>>>> in order to tell if the provided mapping require repartition or not.
>>>>>>>>> Indeed, this is a mapping operation property, not grouping one!
>>>>>>>>> BTW: the
>>>>>>>>> idea of adding extra parameter to `selectKey` was once coined by John
>>>>>>>>> Roesler.
>>>>>>>>>
>>>>>>>>> Arguments in favour for this approach: 1) better code cohesion
>>>>>>>>> from the
>>>>>>>>> point of view of the user, 2) 'smarter' code (the decision is taken
>>>>>>>>> depending on metadata provided for all the upstream mappings), 3)
>>>>>>>>> overall safer for the user.
>>>>>>>>>
>>>>>>>>> Arguments against: invasive KStreams API change, 5 more method
>>>>>>>>> overloads. Further on, when we add a new key-changing operation to
>>>>>>>>> KStream, we must add an overloaded version with 'preservePartition'.
>>>>>>>>> When we add a new overloaded version for existing operation, we
>>>>>>>>> actually
>>>>>>>>> might need to add two or more overloaded versions. This will soon
>>>>>>>>> become
>>>>>>>>> a mess.
>>>>>>>>>
>>>>>>>>> I thought that since `markAsPartitioned` is intended for advanced
>>>>>>>>> users,
>>>>>>>>> they will use it with care. When you're in a position where every
>>>>>>>>> serialization/deserialization round matters for the latency, you're
>>>>>>>>> extremely careful with the topology and you will not thoughtlessly
>>>>>>>>> add
>>>>>>>>> new key-changing operations without controlling how it's going to
>>>>>>>>> change
>>>>>>>>> the overall topology.
>>>>>>>>>
>>>>>>>>> By the way, if we later find a better solution, it's way more easy to
>>>>>>>>> deprecate a single `markAsPartitioned` operation than 5 method
>>>>>>> overloads.
>>>>>>>>>
>>>>>>>>> What do you think?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет:
>>>>>>>>>> Do we really need a whole DSL operator for this? I think the
>>>>>>>>>> original
>>>>>>>>> name
>>>>>>>>>> for this
>>>>>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this
>>>>>>>>>> is not
>>>>>>> an
>>>>>>>>>> operation on the
>>>>>>>>>> stream itself but rather a command/request to whichever operator
>>>>>>>>>> would
>>>>>>>>> have
>>>>>>>>>> otherwise triggered this repartition.
>>>>>>>>>>
>>>>>>>>>> What about instead adding a new field to the
>>>>>>> Grouped/Joined/StreamJoined
>>>>>>>>>> config
>>>>>>>>>> objects that signals them to skip the repartitioning?
>>>>>>>>>>
>>>>>>>>>> The one downside to this specific proposal is that you would then
>>>>>>>>>> need
>>>>>>> to
>>>>>>>>>> specify
>>>>>>>>>> this for every stateful operation downstream of the key-changing
>>>>>>>>> operation.
>>>>>>>>>> So a
>>>>>>>>>> better alternative might be to introduce this `skipRepartition`
>>>>>>>>>> field,
>>>>>>> or
>>>>>>>>>> whatever we
>>>>>>>>>> want to call it, to the config object of the operator that's
>>>>>>>>>> actually
>>>>>>>>> doing
>>>>>>>>>> the key
>>>>>>>>>> changing operation which is apparently preserving the partitioning.
>>>>>>>>>>
>>>>>>>>>> Imo this would be more "safe" relative to the current proposal,
>>>>>>>>>> as the
>>>>>>>>> user
>>>>>>>>>> has to
>>>>>>>>>> explicitly consider whether every key changing operation is indeed
>>>>>>>>>> preserving the
>>>>>>>>>> partitioning. Otherwise you could code up a topology with several
>>>>>>>>>> key
>>>>>>>>>> changing
>>>>>>>>>> operations at the beginning which do require repartitioning. Then
>>>>>>>>>> you
>>>>>>> get
>>>>>>>>>> to the end
>>>>>>>>>> of the topology and insert one final key changing operation that
>>>>>>> doesn't,
>>>>>>>>>> assume
>>>>>>>>>> you can just cancel the repartition, and suddenly you're
>>>>>>>>>> wondering why
>>>>>>>>> your
>>>>>>>>>> results
>>>>>>>>>> are all screwed up
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for the KIP Ivan!
>>>>>>>>>>>
>>>>>>>>>>> I think it's a good feature to give advanced users more control,
>>>>>>>>>>> and
>>>>>>>>>>> allow them to build more efficient application.
>>>>>>>>>>>
>>>>>>>>>>> Not sure if I like the proposed named though (the good old "naming
>>>>>>>>>>> things" discussion :))
>>>>>>>>>>>
>>>>>>>>>>> Did you consider alternatives? What about
>>>>>>>>>>>
>>>>>>>>>>>      - markAsPartitioned()
>>>>>>>>>>>      - markAsKeyed()
>>>>>>>>>>>      - skipRepartition()
>>>>>>>>>>>
>>>>>>>>>>> Not sure if there are other idea on a good name?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
>>>>>>>>>>>> Hello,
>>>>>>>>>>>>
>>>>>>>>>>>> I'd like to start a discussion for KIP-759:
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct`
>>>>>>>>>>>> operator, which turned out to be a separate proposal.
>>>>>>>>>>>>
>>>>>>>>>>>> The proposal is quite trivial, however, we still might consider
>>>>>>>>>>>> alternatives (see 'Possible Alternatives' section).
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Ivan
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
> 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

Posted by Levani Kokhreidze <le...@gmail.com>.
Hi,

Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes dormant, I'd be interested in picking it up.

Levani

> On 23. Apr 2022, at 02:43, Matthias J. Sax <mj...@apache.org> wrote:
> 
> Ivan,
> 
> are you still interested in this KIP? I think it would be a good addition.
> 
> 
> -Matthias
> 
> On 8/16/21 5:30 PM, Matthias J. Sax wrote:
>> Your point about the IQ problem is an interesting one. I missed the
>> point that the "new key" would be a "superkey", and thus, it should
>> always be possible to compute the original key from the superkey. (As a
>> matter of fact, for windowed-table the windowed-key is also a superkey...)
>> I am not sure if we need to follow the "use the head idea" or if we need
>> a "CompositeKey" interface? It seems we can just allow for any types and
>> we can be agnostic to it?
>> KStream<K, V> stream = ...
>> KStream<SK, V> stream2 =
>>   stream.selectKey(/*set superkey*/)
>>         .markAsPartitioned()
>> We only need a `Function<SK, K>` without any restrictions on the type,
>> to map the "superkey" to the original "partition key"?
>> Do you propose to provide the "revers mapper" via the
>> `markAsPartitioned()` method (or config object), or via the IQ methods?
>> Not sure which one is better?
>> However, I am not sure if it would solve the join problem? At least not
>> easily: if one has two KStream<Tuple,...> and one is properly
>> partitioned by `Tuple` while the other one is "marked-as-partitoned",
>> the join would just fail. -- Similar for a stream-table join. -- The
>> only fix would be to do the re-partitioning anyway, effectively ignoring
>> the "user hint", but it seems to defeat the purpose? Again, I would
>> argue that it is ok to not handle this case, but leave it as the
>> responsibility for the user to not mess it up.
>> -Matthias
>> On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
>>> Hi Matthias and Sophie!
>>> 
>>> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
>>> 
>>> I don't have a strong opinion here, both Sophie's and Matthias' points
>>> look convincing for me.
>>> 
>>> I think we should estimate the following: what is the probability that
>>> we will ever need to extend `selectKey` etc. with a config for the
>>> purposes other than `markAsPartitioned`?
>>> 
>>> If we find this probability high, then it's just a refactoring to
>>> deprecate overloads with `Named` and introduce overloads with dedicated
>>> configs, and we should do it this way.
>>> 
>>> If it's low or zero, maybe it's better not to mess with the existing
>>> APIs and to introduce a single `markAsPartitioned()` method, which
>>> itself can be easily deprecated if we find a better solution later!
>>> 
>>> 
>>> ==2. The IQ problem==
>>> 
>>>> it then has to be the case that
>>> 
>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>> 
>>> 
>>> Sophie, you got this wrong, and Matthias already explained why.
>>> 
>>> The actual required property for the mapping function is:
>>> 
>>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
>>> 
>>> or, by contraposition law,
>>> 
>>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
>>> 
>>> 
>>> (look at the whiteboard photo that I attached to the KIP).
>>> 
>>> There is a big class of such mappings: key -> Tuple(key, anyValue). This
>>> is actually what we often do before aggregation, and this mapping does
>>> not require repartition.
>>> 
>>> But of course we can extract the original key from Tuple(key, anyValue),
>>> and this can save IQ and joins!
>>> 
>>> This is what I'm talking about when I talk about 'CompositeKey' idea.
>>> 
>>> We can do the following:
>>> 
>>> 1. implement a 'partitioner wrapper' that recognizes tuples
>>> (CompositeKeys) and uses only the 'head' to calculate the partition,
>>> 
>>> 2. implement
>>> 
>>> selectCompositeKey(BiFunction<K, V> tailSelector) {
>>>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>>>   //MARK_AS_PARTITIONED call here,
>>>   //but this call is an implementation detail and we do not expose
>>>   //markAsPartitioned publicly!
>>> }
>>> 
>>> WDYT? (it's just a brainstorming idea)
>>> 
>>> 09.08.2021 2:38, Matthias J. Sax пишет:
>>>> Hi,
>>>> 
>>>> I originally had a similar thought about `markAsPartitioned()` vs
>>>> extending `selectKey()` et al. with a config. While I agree that it
>>>> might be conceptually cleaner to use a config object, I did not propose
>>>> it as the API impact (deprecating stuff and adding new stuff) is quite
>>>> big... If we think it's an acceptable price to pay, I am ok with it
>>>> though.
>>>> 
>>>> I also do think, that `markAsPartitioned()` could actually be
>>>> categorized as an operator... We don't expose it in the API as
>>>> first-class citizen atm, but in fact we have two types of `KStream` -- a
>>>> "PartitionedKStream" and a "NonPartitionedKStream". Thus,
>>>> `markAsPartitioned()` can be seen as a "cast operator" that converts the
>>>> one into the other.
>>>> 
>>>> I also think that the raised concern about "forgetting to remove
>>>> `markAsPartitioned()`" might not be very strong though. If you have
>>>> different places in the code that link stuff together, a call to eg.
>>>> `selectKey().markAsPartitioned()` must always to together. If you have
>>>> some other place in the code that get a `KStream` passed an input, it
>>>> would be "invalid" to blindly call `markAsPartitioned()` as you don't
>>>> know anything about the upstream code. Of course, it requires some
>>>> "coding discipline" to follow this pattern... Also, you can shoot
>>>> themselves into the foot if they want with the config object pattern,
>>>> too: if you get a `KStream` passed in, you can skip repartitioning via
>>>> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
>>>> slightly prefer to add `markAsPartitioned()` as an operator.
>>>> 
>>>> (Maybe we should have expose a `PartitionedKStream` as first class
>>>> object to begin with... Hard to introduce now I guess...)
>>>> 
>>>> 
>>>> The concern about IQ is interesting -- I did not realize this impact.
>>>> Thanks for bringing it up.
>>>> 
>>>>> a repartition would be a no-op, ie that the stream (and its
>>>>> partitioning)
>>>>> would be the same
>>>>> whether or not a repartition is inserted. For this to be true, it
>>>>> then has
>>>>> to be the case that
>>>>> 
>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>> 
>>>> @Sophie: I don't think this statement is correct. A `markAsPartition()`
>>>> only means, that the existing partitioning ensure that all messages of
>>>> the same new key are still in the same partition. Ie, it cannot happen
>>>> that two new keys (that are the same) are in a different partition.
>>>> 
>>>> However, if you would physically repartitiong on the new key using the
>>>> same hash-function as for the old key, there is no guarantee that the
>>>> same partitions would be picked... And that is why IQ breaks downstream.
>>>> 
>>>> Btw: using `markAsPartitioned()` could also be an issue for joins for
>>>> the same reason... I want to call out, that the Jira tickets that did
>>>> raise the concern about unnecessary repartitioning are about downstream
>>>> aggregations though...
>>>> 
>>>> Last but not least: we actually have a similar situation for
>>>> windowed-aggregations: The result of a window aggregation is partitioned
>>>> by the "plain key": if we write the result into a topic using the same
>>>> partitioning function, we would write to different partitions... (I
>>>> guess it was never an issue so far, as we don't have KIP-300 in place
>>>> yet...)
>>>> 
>>>> It's also not an issue for IQ, because we know the plain key, and thus
>>>> can route to the right task.
>>>> 
>>>> 
>>>> About a solution: I think it might be ok to say we don't need to solve
>>>> this problem, but it's the users responsibility to take IQ into account.
>>>> Ie, if they want to use IQ downstream, the need to repartition: for this
>>>> case, repartitioning is _NOT_ unnecessary... The same argument seems to
>>>> apply for the join case I mentioned above. -- Given that
>>>> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to
>>>> the user to use correctly (we should of course call it out in the docs!).
>>>> 
>>>> 
>>>> 
>>>> -Matthias
>>>> 
>>>> 
>>>> 
>>>> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
>>>>> Before I dive in to the question of IQ and the approaches you
>>>>> proposed, can
>>>>> you just
>>>>> elaborate on the problem itself? By definition, the `markAsPartitioned`
>>>>> flag means that
>>>>> a repartition would be a no-op, ie that the stream (and its
>>>>> partitioning)
>>>>> would be the same
>>>>> whether or not a repartition is inserted. For this to be true, it
>>>>> then has
>>>>> to be the case that
>>>>> 
>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>> 
>>>>> The left-hand side of the above is precisely how we determine the
>>>>> partition
>>>>> number that
>>>>> a key belongs to when using IQ. It shouldn't matter whether the user is
>>>>> querying a key
>>>>> in a store upstream of the key-changing operation or a mapped key
>>>>> downstream of it
>>>>> -- either way we just apply the given Partitioner.
>>>>> 
>>>>> See StreamsMetadataState#getKeyQueryMetadataForKey
>>>>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>
>>>>> 
>>>>> for where this happens
>>>>> 
>>>>> 
>>>>> If we're concerned that users might try to abuse the new
>>>>> `markAsPartitioned` feature,
>>>>> or accidentally misuse it, then we could add a runtime check that
>>>>> applies
>>>>> the Partitioner
>>>>> associated with that subtopology to the key being processed and the
>>>>> mapped
>>>>> key result
>>>>> to assert that they do indeed match. Imo this is probably overkill, just
>>>>> putting it out there.
>>>>> 
>>>>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev
>>>>> <ip...@mail.ru.invalid>
>>>>> wrote:
>>>>> 
>>>>>> Hi Sophie,
>>>>>> 
>>>>>> thanks for your reply! So your proposal is:
>>>>>> 
>>>>>> 1). For each key-changing operation, deprecate the existing overloads
>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>> operator-specific config object.
>>>>>> 2). Add `markAsPartitioned` flag to these configs.
>>>>>> 
>>>>>> IMO, this looks much better than the original proposal, I like it very
>>>>>> much and I think I will rewrite the KIP soon. I absolutely agree with
>>>>>> your points. Repartition logic is not a part of the public contract,
>>>>>> and
>>>>>> it's much better to give it correct hints instead of telling explicitly
>>>>>> what it should do.
>>>>>> 
>>>>>> ...
>>>>>> 
>>>>>> Since we're generating such bright ideas, maybe we should also
>>>>>> brainstorm the interactive query problem?
>>>>>> 
>>>>>> The problem is that interactive queries will not work properly when
>>>>>> `markAsPartitioned` is used. Although original key and mapped key will
>>>>>> be in the same partition, we will no longer be able to guess this
>>>>>> partition given the mapped key only.
>>>>>> 
>>>>>> The possible approaches are:
>>>>>> 
>>>>>> 1) Give up and don't use interactive queries together with
>>>>>> `markAsPartitioned`. This is what I suppose now. But can we do better?
>>>>>> 
>>>>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will
>>>>>> allow IQ to restore the original key in order to choose the correct
>>>>>> partition. We can place this mapping in our new configuration
>>>>>> object. Of
>>>>>> course, there is no way for KStreams to verify in compile time/startup
>>>>>> time that the this function is actually the reverse mapping that
>>>>>> extract
>>>>>> the old key from the new one. Users will forget to provide this
>>>>>> function. Users will provide wrong functions. This all looks too
>>>>>> fragile.
>>>>>> 
>>>>>> 3) Maybe there can be a completely different approach. Let's
>>>>>> introduce a
>>>>>> new entity -- composite keys, consisting of "head" and "tail". The
>>>>>> partition for the composite key is calculated based on its 'head' value
>>>>>> only. If we provide a key mapping in form key -> CompositeKey(key,
>>>>>> tail), then it's obvious that we do not need a repartition. When an
>>>>>> interactive query needs to guess the partition for CompositeKey, it
>>>>>> just
>>>>>> extracts its head and calculates the correct partition.
>>>>>> 
>>>>>> We can select CompositeKey before groupByKey() and aggregation
>>>>>> operations, and this will not involve repartition. And IQ will work.
>>>>>> 
>>>>>> Is it too daring idea, WDYT? My concern: will it cover all the cases
>>>>>> when we want to choose a different key, but also avoid repartition?
>>>>>> 
>>>>>> Regards,
>>>>>> 
>>>>>> Ivan
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет:
>>>>>>> Hey Ivan
>>>>>>> 
>>>>>>> I completely agree that adding it as a config to Grouped/Joined/etc
>>>>>>> isn't
>>>>>>> much better, I was just
>>>>>>> listing it for completeness, and that I would prefer to make it a
>>>>>>> configuration of the key-changing
>>>>>>> operation itself -- that's what I meant by
>>>>>>> 
>>>>>>> a better alternative might be to introduce this ... to the config
>>>>>>> object
>>>>>> of
>>>>>>>> the operator that's actually
>>>>>>> 
>>>>>>> doing the key changing operation
>>>>>>> 
>>>>>>> 
>>>>>>> I personally believe this is the semantically "correct" way to
>>>>>>> approach
>>>>>>> this, since "preserves partitioning"
>>>>>>> or "does not preserve partitioning" is a property of a key-changing
>>>>>>> operation and not an operation on the
>>>>>>> stream itself. Also, this way the user need only tell Streams which
>>>>>>> operations do or do not preserve the
>>>>>>> partitioning, and Streams can figure out where to insert a
>>>>>>> repartition in
>>>>>>> the topology as it does today.
>>>>>>> 
>>>>>>> Otherwise, we're rendering this particularly useful feature of the
>>>>>>> DSL --
>>>>>>> automatic repartitioning -- pretty
>>>>>>> much useless, since the user now has to figure out whether a
>>>>>>> repartition
>>>>>> is
>>>>>>> needed. On top of that, they
>>>>>>> need to have some understanding of where and when this internal
>>>>>>> automatic
>>>>>>> repartitioning logic is going
>>>>>>> to insert that repartition in order to cancel it in the appropriate
>>>>>> place.
>>>>>>> Which is pretty unfortunate, since
>>>>>>> that logic is not part of the public contract: it can change at any
>>>>>>> time,
>>>>>>> for example as it did when we introduced
>>>>>>> the repartition merging optimization.
>>>>>>> 
>>>>>>> All that said, those are valid concerns regarding the expansion of the
>>>>>>> API's surface area. Since none of
>>>>>>> the key-changing operations currently have a config object like some
>>>>>> other
>>>>>>> operations (for example Grouped
>>>>>>> or Consumed, etc), this would double the number of overloads. But
>>>>>>> maybe
>>>>>>> this is a good opportunity to fix
>>>>>>> that problem, rather than keep digging ourselves into holes by
>>>>>>> trying to
>>>>>>> work around it.
>>>>>>> 
>>>>>>> It looks like all of those key-changing operations have two
>>>>>>> overloads at
>>>>>>> the moment, one with no parameters
>>>>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the
>>>>>>> other with an additional Named
>>>>>>> parameter, which is itself another kind of configuration. What if we
>>>>>>> instead deprecate the existing overloads
>>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>>> operator-specific config object like we do
>>>>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named
>>>>>>> and
>>>>>>> this  `markAsPartitioned` flag
>>>>>>> be part of the general config object, which (a) does not expand the
>>>>>>> API
>>>>>>> surface area at all in this KIP, and (b)
>>>>>>> also protects future KIPs from needing to have this same conversation
>>>>>> over
>>>>>>> and over, because we can now
>>>>>>> stick any additional operator properties into that same config object.
>>>>>>> 
>>>>>>> WDYT? By the way, the above idea (introducing a single config
>>>>>>> object to
>>>>>>> wrap all operator properties) was also
>>>>>>> raised by John Roesler a while back. Let's hope he hasn't changed his
>>>>>> mind
>>>>>>> since then :)
>>>>>>> 
>>>>>>> 
>>>>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev
>>>>>>> <iponomarev@mail.ru.invalid
>>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Matthias,
>>>>>>>> 
>>>>>>>> Concerning the naming: I like `markAsPartitioned`, because it
>>>>>>>> describes
>>>>>>>> what this operation is actually doing!
>>>>>>>> 
>>>>>>>> Hi Sophie,
>>>>>>>> 
>>>>>>>> I see the concern about poor code cohesion. We declare key mapping in
>>>>>>>> one place of code, then later in another place we say
>>>>>>>> "markAsPartitioned()". When we change the code six months later, we
>>>>>>>> might forget to remove markAsPartitioned(), especially if it's
>>>>>>>> placed in
>>>>>>>> another method or class. But I don't understand why do you propose to
>>>>>>>> include this config into Grouped/Joined/StreamJoined, because from
>>>>>>>> this
>>>>>>>> point of view it's not a better solution?
>>>>>>>> 
>>>>>>>> The best approach regarding the cohesion might be to to add an extra
>>>>>>>> 'preservePartition' flag to every key-changing operation, that is
>>>>>>>> 
>>>>>>>> 1) selectKey
>>>>>>>> 2) map
>>>>>>>> 3) flatMap
>>>>>>>> 4) transform
>>>>>>>> 5) flatTransform
>>>>>>>> 
>>>>>>>> in order to tell if the provided mapping require repartition or not.
>>>>>>>> Indeed, this is a mapping operation property, not grouping one!
>>>>>>>> BTW: the
>>>>>>>> idea of adding extra parameter to `selectKey` was once coined by John
>>>>>>>> Roesler.
>>>>>>>> 
>>>>>>>> Arguments in favour for this approach: 1) better code cohesion
>>>>>>>> from the
>>>>>>>> point of view of the user, 2) 'smarter' code (the decision is taken
>>>>>>>> depending on metadata provided for all the upstream mappings), 3)
>>>>>>>> overall safer for the user.
>>>>>>>> 
>>>>>>>> Arguments against: invasive KStreams API change, 5 more method
>>>>>>>> overloads. Further on, when we add a new key-changing operation to
>>>>>>>> KStream, we must add an overloaded version with 'preservePartition'.
>>>>>>>> When we add a new overloaded version for existing operation, we
>>>>>>>> actually
>>>>>>>> might need to add two or more overloaded versions. This will soon
>>>>>>>> become
>>>>>>>> a mess.
>>>>>>>> 
>>>>>>>> I thought that since `markAsPartitioned` is intended for advanced
>>>>>>>> users,
>>>>>>>> they will use it with care. When you're in a position where every
>>>>>>>> serialization/deserialization round matters for the latency, you're
>>>>>>>> extremely careful with the topology and you will not thoughtlessly
>>>>>>>> add
>>>>>>>> new key-changing operations without controlling how it's going to
>>>>>>>> change
>>>>>>>> the overall topology.
>>>>>>>> 
>>>>>>>> By the way, if we later find a better solution, it's way more easy to
>>>>>>>> deprecate a single `markAsPartitioned` operation than 5 method
>>>>>> overloads.
>>>>>>>> 
>>>>>>>> What do you think?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет:
>>>>>>>>> Do we really need a whole DSL operator for this? I think the
>>>>>>>>> original
>>>>>>>> name
>>>>>>>>> for this
>>>>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this
>>>>>>>>> is not
>>>>>> an
>>>>>>>>> operation on the
>>>>>>>>> stream itself but rather a command/request to whichever operator
>>>>>>>>> would
>>>>>>>> have
>>>>>>>>> otherwise triggered this repartition.
>>>>>>>>> 
>>>>>>>>> What about instead adding a new field to the
>>>>>> Grouped/Joined/StreamJoined
>>>>>>>>> config
>>>>>>>>> objects that signals them to skip the repartitioning?
>>>>>>>>> 
>>>>>>>>> The one downside to this specific proposal is that you would then
>>>>>>>>> need
>>>>>> to
>>>>>>>>> specify
>>>>>>>>> this for every stateful operation downstream of the key-changing
>>>>>>>> operation.
>>>>>>>>> So a
>>>>>>>>> better alternative might be to introduce this `skipRepartition`
>>>>>>>>> field,
>>>>>> or
>>>>>>>>> whatever we
>>>>>>>>> want to call it, to the config object of the operator that's
>>>>>>>>> actually
>>>>>>>> doing
>>>>>>>>> the key
>>>>>>>>> changing operation which is apparently preserving the partitioning.
>>>>>>>>> 
>>>>>>>>> Imo this would be more "safe" relative to the current proposal,
>>>>>>>>> as the
>>>>>>>> user
>>>>>>>>> has to
>>>>>>>>> explicitly consider whether every key changing operation is indeed
>>>>>>>>> preserving the
>>>>>>>>> partitioning. Otherwise you could code up a topology with several
>>>>>>>>> key
>>>>>>>>> changing
>>>>>>>>> operations at the beginning which do require repartitioning. Then
>>>>>>>>> you
>>>>>> get
>>>>>>>>> to the end
>>>>>>>>> of the topology and insert one final key changing operation that
>>>>>> doesn't,
>>>>>>>>> assume
>>>>>>>>> you can just cancel the repartition, and suddenly you're
>>>>>>>>> wondering why
>>>>>>>> your
>>>>>>>>> results
>>>>>>>>> are all screwed up
>>>>>>>>> 
>>>>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Thanks for the KIP Ivan!
>>>>>>>>>> 
>>>>>>>>>> I think it's a good feature to give advanced users more control,
>>>>>>>>>> and
>>>>>>>>>> allow them to build more efficient application.
>>>>>>>>>> 
>>>>>>>>>> Not sure if I like the proposed named though (the good old "naming
>>>>>>>>>> things" discussion :))
>>>>>>>>>> 
>>>>>>>>>> Did you consider alternatives? What about
>>>>>>>>>> 
>>>>>>>>>>     - markAsPartitioned()
>>>>>>>>>>     - markAsKeyed()
>>>>>>>>>>     - skipRepartition()
>>>>>>>>>> 
>>>>>>>>>> Not sure if there are other idea on a good name?
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
>>>>>>>>>>> Hello,
>>>>>>>>>>> 
>>>>>>>>>>> I'd like to start a discussion for KIP-759:
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> 
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct`
>>>>>>>>>>> operator, which turned out to be a separate proposal.
>>>>>>>>>>> 
>>>>>>>>>>> The proposal is quite trivial, however, we still might consider
>>>>>>>>>>> alternatives (see 'Possible Alternatives' section).
>>>>>>>>>>> 
>>>>>>>>>>> Regards,
>>>>>>>>>>> 
>>>>>>>>>>> Ivan
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>