You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Levani Kokhreidze <le...@gmail.com> on 2022/05/23 06:39:54 UTC

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

Hi all,

Since there was no activity around this KIP, I’ll pick it up in coming weeks and continue the discussion.

Best,
Levani

> On 27. Apr 2022, at 22:50, Matthias J. Sax <mj...@apache.org> wrote:
> 
> Let's wait a couple of days to give Ivan a chance to reply. If he does not reply, feel free to pick it up.
> 
> 
> -Matthias
> 
> On 4/26/22 3:58 AM, Levani Kokhreidze wrote:
>> Hi,
>> Sorry, maybe I am jumping the gun here, but if by any chance this KIP becomes dormant, I'd be interested in picking it up.
>> Levani
>>> On 23. Apr 2022, at 02:43, Matthias J. Sax <mj...@apache.org> wrote:
>>> 
>>> Ivan,
>>> 
>>> are you still interested in this KIP? I think it would be a good addition.
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 8/16/21 5:30 PM, Matthias J. Sax wrote:
>>>> Your point about the IQ problem is an interesting one. I missed the
>>>> point that the "new key" would be a "superkey", and thus, it should
>>>> always be possible to compute the original key from the superkey. (As a
>>>> matter of fact, for windowed-table the windowed-key is also a superkey...)
>>>> I am not sure if we need to follow the "use the head idea" or if we need
>>>> a "CompositeKey" interface? It seems we can just allow for any types and
>>>> we can be agnostic to it?
>>>> KStream<K, V> stream = ...
>>>> KStream<SK, V> stream2 =
>>>>   stream.selectKey(/*set superkey*/)
>>>>         .markAsPartitioned()
>>>> We only need a `Function<SK, K>` without any restrictions on the type,
>>>> to map the "superkey" to the original "partition key"?
>>>> Do you propose to provide the "revers mapper" via the
>>>> `markAsPartitioned()` method (or config object), or via the IQ methods?
>>>> Not sure which one is better?
>>>> However, I am not sure if it would solve the join problem? At least not
>>>> easily: if one has two KStream<Tuple,...> and one is properly
>>>> partitioned by `Tuple` while the other one is "marked-as-partitoned",
>>>> the join would just fail. -- Similar for a stream-table join. -- The
>>>> only fix would be to do the re-partitioning anyway, effectively ignoring
>>>> the "user hint", but it seems to defeat the purpose? Again, I would
>>>> argue that it is ok to not handle this case, but leave it as the
>>>> responsibility for the user to not mess it up.
>>>> -Matthias
>>>> On 8/9/21 2:32 PM, Ivan Ponomarev wrote:
>>>>> Hi Matthias and Sophie!
>>>>> 
>>>>> ==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==
>>>>> 
>>>>> I don't have a strong opinion here, both Sophie's and Matthias' points
>>>>> look convincing for me.
>>>>> 
>>>>> I think we should estimate the following: what is the probability that
>>>>> we will ever need to extend `selectKey` etc. with a config for the
>>>>> purposes other than `markAsPartitioned`?
>>>>> 
>>>>> If we find this probability high, then it's just a refactoring to
>>>>> deprecate overloads with `Named` and introduce overloads with dedicated
>>>>> configs, and we should do it this way.
>>>>> 
>>>>> If it's low or zero, maybe it's better not to mess with the existing
>>>>> APIs and to introduce a single `markAsPartitioned()` method, which
>>>>> itself can be easily deprecated if we find a better solution later!
>>>>> 
>>>>> 
>>>>> ==2. The IQ problem==
>>>>> 
>>>>>> it then has to be the case that
>>>>> 
>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>> 
>>>>> 
>>>>> Sophie, you got this wrong, and Matthias already explained why.
>>>>> 
>>>>> The actual required property for the mapping function is:
>>>>> 
>>>>> \forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))
>>>>> 
>>>>> or, by contraposition law,
>>>>> 
>>>>> \forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )
>>>>> 
>>>>> 
>>>>> (look at the whiteboard photo that I attached to the KIP).
>>>>> 
>>>>> There is a big class of such mappings: key -> Tuple(key, anyValue). This
>>>>> is actually what we often do before aggregation, and this mapping does
>>>>> not require repartition.
>>>>> 
>>>>> But of course we can extract the original key from Tuple(key, anyValue),
>>>>> and this can save IQ and joins!
>>>>> 
>>>>> This is what I'm talking about when I talk about 'CompositeKey' idea.
>>>>> 
>>>>> We can do the following:
>>>>> 
>>>>> 1. implement a 'partitioner wrapper' that recognizes tuples
>>>>> (CompositeKeys) and uses only the 'head' to calculate the partition,
>>>>> 
>>>>> 2. implement
>>>>> 
>>>>> selectCompositeKey(BiFunction<K, V> tailSelector) {
>>>>>   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
>>>>>   //MARK_AS_PARTITIONED call here,
>>>>>   //but this call is an implementation detail and we do not expose
>>>>>   //markAsPartitioned publicly!
>>>>> }
>>>>> 
>>>>> WDYT? (it's just a brainstorming idea)
>>>>> 
>>>>> 09.08.2021 2:38, Matthias J. Sax пишет:
>>>>>> Hi,
>>>>>> 
>>>>>> I originally had a similar thought about `markAsPartitioned()` vs
>>>>>> extending `selectKey()` et al. with a config. While I agree that it
>>>>>> might be conceptually cleaner to use a config object, I did not propose
>>>>>> it as the API impact (deprecating stuff and adding new stuff) is quite
>>>>>> big... If we think it's an acceptable price to pay, I am ok with it
>>>>>> though.
>>>>>> 
>>>>>> I also do think, that `markAsPartitioned()` could actually be
>>>>>> categorized as an operator... We don't expose it in the API as
>>>>>> first-class citizen atm, but in fact we have two types of `KStream` -- a
>>>>>> "PartitionedKStream" and a "NonPartitionedKStream". Thus,
>>>>>> `markAsPartitioned()` can be seen as a "cast operator" that converts the
>>>>>> one into the other.
>>>>>> 
>>>>>> I also think that the raised concern about "forgetting to remove
>>>>>> `markAsPartitioned()`" might not be very strong though. If you have
>>>>>> different places in the code that link stuff together, a call to eg.
>>>>>> `selectKey().markAsPartitioned()` must always to together. If you have
>>>>>> some other place in the code that get a `KStream` passed an input, it
>>>>>> would be "invalid" to blindly call `markAsPartitioned()` as you don't
>>>>>> know anything about the upstream code. Of course, it requires some
>>>>>> "coding discipline" to follow this pattern... Also, you can shoot
>>>>>> themselves into the foot if they want with the config object pattern,
>>>>>> too: if you get a `KStream` passed in, you can skip repartitioning via
>>>>>> `selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
>>>>>> slightly prefer to add `markAsPartitioned()` as an operator.
>>>>>> 
>>>>>> (Maybe we should have expose a `PartitionedKStream` as first class
>>>>>> object to begin with... Hard to introduce now I guess...)
>>>>>> 
>>>>>> 
>>>>>> The concern about IQ is interesting -- I did not realize this impact.
>>>>>> Thanks for bringing it up.
>>>>>> 
>>>>>>> a repartition would be a no-op, ie that the stream (and its
>>>>>>> partitioning)
>>>>>>> would be the same
>>>>>>> whether or not a repartition is inserted. For this to be true, it
>>>>>>> then has
>>>>>>> to be the case that
>>>>>>> 
>>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>>> 
>>>>>> @Sophie: I don't think this statement is correct. A `markAsPartition()`
>>>>>> only means, that the existing partitioning ensure that all messages of
>>>>>> the same new key are still in the same partition. Ie, it cannot happen
>>>>>> that two new keys (that are the same) are in a different partition.
>>>>>> 
>>>>>> However, if you would physically repartitiong on the new key using the
>>>>>> same hash-function as for the old key, there is no guarantee that the
>>>>>> same partitions would be picked... And that is why IQ breaks downstream.
>>>>>> 
>>>>>> Btw: using `markAsPartitioned()` could also be an issue for joins for
>>>>>> the same reason... I want to call out, that the Jira tickets that did
>>>>>> raise the concern about unnecessary repartitioning are about downstream
>>>>>> aggregations though...
>>>>>> 
>>>>>> Last but not least: we actually have a similar situation for
>>>>>> windowed-aggregations: The result of a window aggregation is partitioned
>>>>>> by the "plain key": if we write the result into a topic using the same
>>>>>> partitioning function, we would write to different partitions... (I
>>>>>> guess it was never an issue so far, as we don't have KIP-300 in place
>>>>>> yet...)
>>>>>> 
>>>>>> It's also not an issue for IQ, because we know the plain key, and thus
>>>>>> can route to the right task.
>>>>>> 
>>>>>> 
>>>>>> About a solution: I think it might be ok to say we don't need to solve
>>>>>> this problem, but it's the users responsibility to take IQ into account.
>>>>>> Ie, if they want to use IQ downstream, the need to repartition: for this
>>>>>> case, repartitioning is _NOT_ unnecessary... The same argument seems to
>>>>>> apply for the join case I mentioned above. -- Given that
>>>>>> `markAsPartitioned()` is an advanced feature, it seems ok to leave it to
>>>>>> the user to use correctly (we should of course call it out in the docs!).
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> -Matthias
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
>>>>>>> Before I dive in to the question of IQ and the approaches you
>>>>>>> proposed, can
>>>>>>> you just
>>>>>>> elaborate on the problem itself? By definition, the `markAsPartitioned`
>>>>>>> flag means that
>>>>>>> a repartition would be a no-op, ie that the stream (and its
>>>>>>> partitioning)
>>>>>>> would be the same
>>>>>>> whether or not a repartition is inserted. For this to be true, it
>>>>>>> then has
>>>>>>> to be the case that
>>>>>>> 
>>>>>>> Partitioner.partition(key) == Partitioner.partition(map(key))
>>>>>>> 
>>>>>>> The left-hand side of the above is precisely how we determine the
>>>>>>> partition
>>>>>>> number that
>>>>>>> a key belongs to when using IQ. It shouldn't matter whether the user is
>>>>>>> querying a key
>>>>>>> in a store upstream of the key-changing operation or a mapped key
>>>>>>> downstream of it
>>>>>>> -- either way we just apply the given Partitioner.
>>>>>>> 
>>>>>>> See StreamsMetadataState#getKeyQueryMetadataForKey
>>>>>>> <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283>
>>>>>>> 
>>>>>>> for where this happens
>>>>>>> 
>>>>>>> 
>>>>>>> If we're concerned that users might try to abuse the new
>>>>>>> `markAsPartitioned` feature,
>>>>>>> or accidentally misuse it, then we could add a runtime check that
>>>>>>> applies
>>>>>>> the Partitioner
>>>>>>> associated with that subtopology to the key being processed and the
>>>>>>> mapped
>>>>>>> key result
>>>>>>> to assert that they do indeed match. Imo this is probably overkill, just
>>>>>>> putting it out there.
>>>>>>> 
>>>>>>> On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev
>>>>>>> <ip...@mail.ru.invalid>
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Sophie,
>>>>>>>> 
>>>>>>>> thanks for your reply! So your proposal is:
>>>>>>>> 
>>>>>>>> 1). For each key-changing operation, deprecate the existing overloads
>>>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>>>> operator-specific config object.
>>>>>>>> 2). Add `markAsPartitioned` flag to these configs.
>>>>>>>> 
>>>>>>>> IMO, this looks much better than the original proposal, I like it very
>>>>>>>> much and I think I will rewrite the KIP soon. I absolutely agree with
>>>>>>>> your points. Repartition logic is not a part of the public contract,
>>>>>>>> and
>>>>>>>> it's much better to give it correct hints instead of telling explicitly
>>>>>>>> what it should do.
>>>>>>>> 
>>>>>>>> ...
>>>>>>>> 
>>>>>>>> Since we're generating such bright ideas, maybe we should also
>>>>>>>> brainstorm the interactive query problem?
>>>>>>>> 
>>>>>>>> The problem is that interactive queries will not work properly when
>>>>>>>> `markAsPartitioned` is used. Although original key and mapped key will
>>>>>>>> be in the same partition, we will no longer be able to guess this
>>>>>>>> partition given the mapped key only.
>>>>>>>> 
>>>>>>>> The possible approaches are:
>>>>>>>> 
>>>>>>>> 1) Give up and don't use interactive queries together with
>>>>>>>> `markAsPartitioned`. This is what I suppose now. But can we do better?
>>>>>>>> 
>>>>>>>> 2) Maybe we should ask the user to provide 'reverse mapping' that will
>>>>>>>> allow IQ to restore the original key in order to choose the correct
>>>>>>>> partition. We can place this mapping in our new configuration
>>>>>>>> object. Of
>>>>>>>> course, there is no way for KStreams to verify in compile time/startup
>>>>>>>> time that the this function is actually the reverse mapping that
>>>>>>>> extract
>>>>>>>> the old key from the new one. Users will forget to provide this
>>>>>>>> function. Users will provide wrong functions. This all looks too
>>>>>>>> fragile.
>>>>>>>> 
>>>>>>>> 3) Maybe there can be a completely different approach. Let's
>>>>>>>> introduce a
>>>>>>>> new entity -- composite keys, consisting of "head" and "tail". The
>>>>>>>> partition for the composite key is calculated based on its 'head' value
>>>>>>>> only. If we provide a key mapping in form key -> CompositeKey(key,
>>>>>>>> tail), then it's obvious that we do not need a repartition. When an
>>>>>>>> interactive query needs to guess the partition for CompositeKey, it
>>>>>>>> just
>>>>>>>> extracts its head and calculates the correct partition.
>>>>>>>> 
>>>>>>>> We can select CompositeKey before groupByKey() and aggregation
>>>>>>>> operations, and this will not involve repartition. And IQ will work.
>>>>>>>> 
>>>>>>>> Is it too daring idea, WDYT? My concern: will it cover all the cases
>>>>>>>> when we want to choose a different key, but also avoid repartition?
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> 
>>>>>>>> Ivan
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 06.08.2021 23:19, Sophie Blee-Goldman пишет:
>>>>>>>>> Hey Ivan
>>>>>>>>> 
>>>>>>>>> I completely agree that adding it as a config to Grouped/Joined/etc
>>>>>>>>> isn't
>>>>>>>>> much better, I was just
>>>>>>>>> listing it for completeness, and that I would prefer to make it a
>>>>>>>>> configuration of the key-changing
>>>>>>>>> operation itself -- that's what I meant by
>>>>>>>>> 
>>>>>>>>> a better alternative might be to introduce this ... to the config
>>>>>>>>> object
>>>>>>>> of
>>>>>>>>>> the operator that's actually
>>>>>>>>> 
>>>>>>>>> doing the key changing operation
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I personally believe this is the semantically "correct" way to
>>>>>>>>> approach
>>>>>>>>> this, since "preserves partitioning"
>>>>>>>>> or "does not preserve partitioning" is a property of a key-changing
>>>>>>>>> operation and not an operation on the
>>>>>>>>> stream itself. Also, this way the user need only tell Streams which
>>>>>>>>> operations do or do not preserve the
>>>>>>>>> partitioning, and Streams can figure out where to insert a
>>>>>>>>> repartition in
>>>>>>>>> the topology as it does today.
>>>>>>>>> 
>>>>>>>>> Otherwise, we're rendering this particularly useful feature of the
>>>>>>>>> DSL --
>>>>>>>>> automatic repartitioning -- pretty
>>>>>>>>> much useless, since the user now has to figure out whether a
>>>>>>>>> repartition
>>>>>>>> is
>>>>>>>>> needed. On top of that, they
>>>>>>>>> need to have some understanding of where and when this internal
>>>>>>>>> automatic
>>>>>>>>> repartitioning logic is going
>>>>>>>>> to insert that repartition in order to cancel it in the appropriate
>>>>>>>> place.
>>>>>>>>> Which is pretty unfortunate, since
>>>>>>>>> that logic is not part of the public contract: it can change at any
>>>>>>>>> time,
>>>>>>>>> for example as it did when we introduced
>>>>>>>>> the repartition merging optimization.
>>>>>>>>> 
>>>>>>>>> All that said, those are valid concerns regarding the expansion of the
>>>>>>>>> API's surface area. Since none of
>>>>>>>>> the key-changing operations currently have a config object like some
>>>>>>>> other
>>>>>>>>> operations (for example Grouped
>>>>>>>>> or Consumed, etc), this would double the number of overloads. But
>>>>>>>>> maybe
>>>>>>>>> this is a good opportunity to fix
>>>>>>>>> that problem, rather than keep digging ourselves into holes by
>>>>>>>>> trying to
>>>>>>>>> work around it.
>>>>>>>>> 
>>>>>>>>> It looks like all of those key-changing operations have two
>>>>>>>>> overloads at
>>>>>>>>> the moment, one with no parameters
>>>>>>>>> beyond the operation itself (eg KeyValueMapper for #selectKey) and the
>>>>>>>>> other with an additional Named
>>>>>>>>> parameter, which is itself another kind of configuration. What if we
>>>>>>>>> instead deprecate the existing overloads
>>>>>>>>> that accept a Named, and replace them with overloads that take an
>>>>>>>>> operator-specific config object like we do
>>>>>>>>> elsewhere (eg Grouped for #groupByKey). Then we can have both Named
>>>>>>>>> and
>>>>>>>>> this  `markAsPartitioned` flag
>>>>>>>>> be part of the general config object, which (a) does not expand the
>>>>>>>>> API
>>>>>>>>> surface area at all in this KIP, and (b)
>>>>>>>>> also protects future KIPs from needing to have this same conversation
>>>>>>>> over
>>>>>>>>> and over, because we can now
>>>>>>>>> stick any additional operator properties into that same config object.
>>>>>>>>> 
>>>>>>>>> WDYT? By the way, the above idea (introducing a single config
>>>>>>>>> object to
>>>>>>>>> wrap all operator properties) was also
>>>>>>>>> raised by John Roesler a while back. Let's hope he hasn't changed his
>>>>>>>> mind
>>>>>>>>> since then :)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev
>>>>>>>>> <iponomarev@mail.ru.invalid
>>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Matthias,
>>>>>>>>>> 
>>>>>>>>>> Concerning the naming: I like `markAsPartitioned`, because it
>>>>>>>>>> describes
>>>>>>>>>> what this operation is actually doing!
>>>>>>>>>> 
>>>>>>>>>> Hi Sophie,
>>>>>>>>>> 
>>>>>>>>>> I see the concern about poor code cohesion. We declare key mapping in
>>>>>>>>>> one place of code, then later in another place we say
>>>>>>>>>> "markAsPartitioned()". When we change the code six months later, we
>>>>>>>>>> might forget to remove markAsPartitioned(), especially if it's
>>>>>>>>>> placed in
>>>>>>>>>> another method or class. But I don't understand why do you propose to
>>>>>>>>>> include this config into Grouped/Joined/StreamJoined, because from
>>>>>>>>>> this
>>>>>>>>>> point of view it's not a better solution?
>>>>>>>>>> 
>>>>>>>>>> The best approach regarding the cohesion might be to to add an extra
>>>>>>>>>> 'preservePartition' flag to every key-changing operation, that is
>>>>>>>>>> 
>>>>>>>>>> 1) selectKey
>>>>>>>>>> 2) map
>>>>>>>>>> 3) flatMap
>>>>>>>>>> 4) transform
>>>>>>>>>> 5) flatTransform
>>>>>>>>>> 
>>>>>>>>>> in order to tell if the provided mapping require repartition or not.
>>>>>>>>>> Indeed, this is a mapping operation property, not grouping one!
>>>>>>>>>> BTW: the
>>>>>>>>>> idea of adding extra parameter to `selectKey` was once coined by John
>>>>>>>>>> Roesler.
>>>>>>>>>> 
>>>>>>>>>> Arguments in favour for this approach: 1) better code cohesion
>>>>>>>>>> from the
>>>>>>>>>> point of view of the user, 2) 'smarter' code (the decision is taken
>>>>>>>>>> depending on metadata provided for all the upstream mappings), 3)
>>>>>>>>>> overall safer for the user.
>>>>>>>>>> 
>>>>>>>>>> Arguments against: invasive KStreams API change, 5 more method
>>>>>>>>>> overloads. Further on, when we add a new key-changing operation to
>>>>>>>>>> KStream, we must add an overloaded version with 'preservePartition'.
>>>>>>>>>> When we add a new overloaded version for existing operation, we
>>>>>>>>>> actually
>>>>>>>>>> might need to add two or more overloaded versions. This will soon
>>>>>>>>>> become
>>>>>>>>>> a mess.
>>>>>>>>>> 
>>>>>>>>>> I thought that since `markAsPartitioned` is intended for advanced
>>>>>>>>>> users,
>>>>>>>>>> they will use it with care. When you're in a position where every
>>>>>>>>>> serialization/deserialization round matters for the latency, you're
>>>>>>>>>> extremely careful with the topology and you will not thoughtlessly
>>>>>>>>>> add
>>>>>>>>>> new key-changing operations without controlling how it's going to
>>>>>>>>>> change
>>>>>>>>>> the overall topology.
>>>>>>>>>> 
>>>>>>>>>> By the way, if we later find a better solution, it's way more easy to
>>>>>>>>>> deprecate a single `markAsPartitioned` operation than 5 method
>>>>>>>> overloads.
>>>>>>>>>> 
>>>>>>>>>> What do you think?
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 04.08.2021 4:23, Sophie Blee-Goldman пишет:
>>>>>>>>>>> Do we really need a whole DSL operator for this? I think the
>>>>>>>>>>> original
>>>>>>>>>> name
>>>>>>>>>>> for this
>>>>>>>>>>> operator -- `cancelRepartition()` -- is itself a sign that this
>>>>>>>>>>> is not
>>>>>>>> an
>>>>>>>>>>> operation on the
>>>>>>>>>>> stream itself but rather a command/request to whichever operator
>>>>>>>>>>> would
>>>>>>>>>> have
>>>>>>>>>>> otherwise triggered this repartition.
>>>>>>>>>>> 
>>>>>>>>>>> What about instead adding a new field to the
>>>>>>>> Grouped/Joined/StreamJoined
>>>>>>>>>>> config
>>>>>>>>>>> objects that signals them to skip the repartitioning?
>>>>>>>>>>> 
>>>>>>>>>>> The one downside to this specific proposal is that you would then
>>>>>>>>>>> need
>>>>>>>> to
>>>>>>>>>>> specify
>>>>>>>>>>> this for every stateful operation downstream of the key-changing
>>>>>>>>>> operation.
>>>>>>>>>>> So a
>>>>>>>>>>> better alternative might be to introduce this `skipRepartition`
>>>>>>>>>>> field,
>>>>>>>> or
>>>>>>>>>>> whatever we
>>>>>>>>>>> want to call it, to the config object of the operator that's
>>>>>>>>>>> actually
>>>>>>>>>> doing
>>>>>>>>>>> the key
>>>>>>>>>>> changing operation which is apparently preserving the partitioning.
>>>>>>>>>>> 
>>>>>>>>>>> Imo this would be more "safe" relative to the current proposal,
>>>>>>>>>>> as the
>>>>>>>>>> user
>>>>>>>>>>> has to
>>>>>>>>>>> explicitly consider whether every key changing operation is indeed
>>>>>>>>>>> preserving the
>>>>>>>>>>> partitioning. Otherwise you could code up a topology with several
>>>>>>>>>>> key
>>>>>>>>>>> changing
>>>>>>>>>>> operations at the beginning which do require repartitioning. Then
>>>>>>>>>>> you
>>>>>>>> get
>>>>>>>>>>> to the end
>>>>>>>>>>> of the topology and insert one final key changing operation that
>>>>>>>> doesn't,
>>>>>>>>>>> assume
>>>>>>>>>>> you can just cancel the repartition, and suddenly you're
>>>>>>>>>>> wondering why
>>>>>>>>>> your
>>>>>>>>>>> results
>>>>>>>>>>> are all screwed up
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax <mj...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for the KIP Ivan!
>>>>>>>>>>>> 
>>>>>>>>>>>> I think it's a good feature to give advanced users more control,
>>>>>>>>>>>> and
>>>>>>>>>>>> allow them to build more efficient application.
>>>>>>>>>>>> 
>>>>>>>>>>>> Not sure if I like the proposed named though (the good old "naming
>>>>>>>>>>>> things" discussion :))
>>>>>>>>>>>> 
>>>>>>>>>>>> Did you consider alternatives? What about
>>>>>>>>>>>> 
>>>>>>>>>>>>     - markAsPartitioned()
>>>>>>>>>>>>     - markAsKeyed()
>>>>>>>>>>>>     - skipRepartition()
>>>>>>>>>>>> 
>>>>>>>>>>>> Not sure if there are other idea on a good name?
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>> 
>>>>>>>>>>>> On 6/24/21 7:45 AM, Ivan Ponomarev wrote:
>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I'd like to start a discussion for KIP-759:
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling
>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This is an offshoot of the discussion of KIP-655 for a `distinct`
>>>>>>>>>>>>> operator, which turned out to be a separate proposal.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The proposal is quite trivial, however, we still might consider
>>>>>>>>>>>>> alternatives (see 'Possible Alternatives' section).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Ivan
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>