You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Sagar <sa...@gmail.com> on 2022/05/31 09:44:14 UTC

[VOTE] KIP-837 Allow MultiCasting a Result Record.

Hi All,

I would like to start a voting thread on
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356.

I am just starting this as the discussion thread has been open for 10+
days. In case there are some comments, we can always discuss them over
there.

Thanks!
Sagar.

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by "Matthias J. Sax" <mj...@apache.org>.
Thanks.

Glad we could align.


-Matthias

On 12/21/22 2:09 AM, Sagar wrote:
> Hi All,
> 
> Just as an update, the changes described here:
> 
> ```
> Changes at a high level are:
> 
> 1) KeyQueryMetada enhanced to have a new method called partitions().
> 2) Lifting the restriction of a single partition for IQ. Now the
> restriction holds only for FK Join.
> ```
> 
> are reverted back. As things stand,  KeyQueryMetada exposes only the
> partition() method and the restriction for single partition is added back
> for IQ. This has been done based on the points raised by Matthias above.
> 
> The KIP has been updated accordingly.
> 
> Thanks!
> Sagar.
> 
> On Sat, Dec 10, 2022 at 12:09 AM Sagar <sa...@gmail.com> wrote:
> 
>> Hey Matthias,
>>
>> Actually I had shared the PR link for any potential issues that might have
>> gone missing. I guess it didn't come out that way in my response. Apologies
>> for that!
>>
>> I am more than happy to incorporate any feedback/changes or address any
>> concerns that are still present around this at this point as well.
>>
>> And I would keep in mind the feedback to provide more time in such a
>> scenario.
>>
>> Thanks!
>> Sagar.
>>
>> On Fri, Dec 9, 2022 at 11:41 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> It is what it is.
>>>
>>>> we did have internal discussions on this
>>>
>>> We sometimes have the case that a KIP need adjustment as stuff is
>>> discovered during coding. And having a discussion on the PR about it is
>>> fine. -- However, before the PR gets merge, the KIP change should be
>>> announced to verify that nobody has objections to he change, before we
>>> carry forward.
>>>
>>> It's up to the committer who reviews/merges the PR to ensure that this
>>> process is followed IMHO. I hope we can do better next time.
>>>
>>> (And yes, there was the 3.4 release KIP deadline that might explain it,
>>> but it seems important that we give enough time is make "tricky" changes
>>> and not rush into stuff IMHO.)
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 12/8/22 7:04 PM, Sagar wrote:
>>>> Thanks Matthias,
>>>>
>>>> Well, as things stand, we did have internal discussions on this and it
>>>> seemed ok to open it up for IQ and more importantly not ok to have it
>>>> opened up for FK-Join. And more importantly, the PR for this is already
>>>> merged and some of these things came up during that. Here's the PR link:
>>>> https://github.com/apache/kafka/pull/12803.
>>>>
>>>> Thanks!
>>>> Sagar.
>>>>
>>>>
>>>> On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>>>
>>>>> Ah. Missed it as it does not have a nice "code block" similar to
>>>>> `StreamPartitioner` changes.
>>>>>
>>>>> I understand the motivation, but I am wondering if we might head into a
>>>>> tricky direction? State stores (at least the built-in ones) and IQ are
>>>>> kinda build with the idea to have sharded data and that a multi-cast of
>>>>> keys is an anti-pattern?
>>>>>
>>>>> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
>>>>> sure that generalizing the concepts does not cause issues in the
>>> future?
>>>>>
>>>>> Ie, should we claim that the multi-cast feature should be used for
>>>>> KStreams only, but not for KTables?
>>>>>
>>>>> Just want to double check that we are not doing something we regret
>>> later.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>> On 12/7/22 6:45 PM, Sagar wrote:
>>>>>> Hi Mathias,
>>>>>>
>>>>>> I did save it. The changes are added under Public Interfaces (Pt#2
>>> about
>>>>>> enhancing KeyQueryMetadata with partitions method) and
>>>>>> throwing IllegalArgumentException when StreamPartitioner#partitions
>>>>> method
>>>>>> returns multiple partitions for just FK-join instead of the earlier
>>>>> decided
>>>>>> FK-Join and IQ.
>>>>>>
>>>>>> The background is that for IQ, if the users have multi casted records
>>> to
>>>>>> multiple partitions during ingestion but the fetch returns only a
>>> single
>>>>>> partition, then it would be wrong. That's why the restriction was
>>> lifted
>>>>>> for IQ and that's the reason KeyQueryMetadata now has another
>>>>> partitions()
>>>>>> method to signify the same.
>>>>>>
>>>>>> FK-Join also has a similar case, but while reviewing it was felt that
>>>>>> FK-Join on it's own is fairly complicated and we don't need this
>>> feature
>>>>>> right away so the restriction still exists.
>>>>>>
>>>>>> Thanks!
>>>>>> Sagar.
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>>>>>
>>>>>>> I don't see any update on the wiki about it. Did you forget to hit
>>>>> "save"?
>>>>>>>
>>>>>>> Can you also provide some background? I am not sure right now if I
>>>>>>> understand the proposed changes?
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
>>>>>>>> Thanks Sagar, this makes sense to me -- we clearly need additional
>>>>>>> changes
>>>>>>>> to
>>>>>>>> avoid breaking IQ when using this feature, but I agree with
>>> continuing
>>>>> to
>>>>>>>> restrict
>>>>>>>> FKJ since they wouldn't stop working without it, and would become
>>> much
>>>>>>>> harder
>>>>>>>> to reason about (than they already are) if we did enable them to use
>>>>> it.
>>>>>>>>
>>>>>>>> And of course, they can still multicast the final results of a FKJ,
>>>>> they
>>>>>>>> just can't
>>>>>>>> mess with the internal workings of it in this way.
>>>>>>>>
>>>>>>>> On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com>
>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I made a couple of edits to the KIP which came up during the code
>>>>>>> review.
>>>>>>>>> Changes at a high level are:
>>>>>>>>>
>>>>>>>>> 1) KeyQueryMetada enhanced to have a new method called
>>> partitions().
>>>>>>>>> 2) Lifting the restriction of a single partition for IQ. Now the
>>>>>>>>> restriction holds only for FK Join.
>>>>>>>>>
>>>>>>>>> Updated KIP:
>>>>>>>>>
>>>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Sagar.
>>>>>>>>>
>>>>>>>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Bruno,
>>>>>>>>>>
>>>>>>>>>> Marking this as accepted.
>>>>>>>>>>
>>>>>>>>>> Thanks everyone for their comments/feedback.
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>> Sagar.
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cadonna@apache.org
>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the update and the PR!
>>>>>>>>>>>
>>>>>>>>>>> +1 (binding)
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Bruno
>>>>>>>>>>>
>>>>>>>>>>> On 10.09.22 18:57, Sagar wrote:
>>>>>>>>>>>> Hi Bruno,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks, I think these changes make sense to me. I have updated
>>> the
>>>>>>> KIP
>>>>>>>>>>>> accordingly.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <
>>> cadonna@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would not drop the support for dropping records. I would also
>>>>> not
>>>>>>>>>>>>> return null from partitions(). Maybe an Optional can help
>>> here. An
>>>>>>>>>>> empty
>>>>>>>>>>>>> Optional would mean to use the default partitioning behavior of
>>>>> the
>>>>>>>>>>>>> producer. So we would have:
>>>>>>>>>>>>>
>>>>>>>>>>>>> - non-empty Optional, non-empty list of integers: partitions to
>>>>> send
>>>>>>>>>>> the
>>>>>>>>>>>>> record to
>>>>>>>>>>>>> - non-empty Optional, empty list of integers: drop the record
>>>>>>>>>>>>> - empty Optional: use default behavior
>>>>>>>>>>>>>
>>>>>>>>>>>>> What do other think?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 02.09.22 13:53, Sagar wrote:
>>>>>>>>>>>>>> Hello Bruno/Chris,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Since these are the last set of changes(I am assuming haha),
>>> it
>>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>>>>> great if you could review the 2 options from above so that we
>>> can
>>>>>>>>>>> close
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> voting. Of course I am happy to incorporate any other
>>> requisite
>>>>>>>>>>> changes.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <
>>>>> sagarmeansocean@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Bruno for the great points.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I see 2 options here =>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) As Chris suggested, drop the support for dropping records
>>> in
>>>>>>> the
>>>>>>>>>>>>>>> partitioner. That way, an empty list could signify the usage
>>> of
>>>>> a
>>>>>>>>>>>>> default
>>>>>>>>>>>>>>> partitioner. Also, if the deprecated partition() method
>>> returns
>>>>>>>>> null
>>>>>>>>>>>>>>> thereby signifying the default partitioner, the partitions()
>>> can
>>>>>>>>>>> return
>>>>>>>>>>>>> an
>>>>>>>>>>>>>>> empty list i.e default partitioner.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2) OR we treat a null return type of partitions() method to
>>>>>>> signify
>>>>>>>>>>> the
>>>>>>>>>>>>>>> usage of the default partitioner. In the default
>>> implementation
>>>>> of
>>>>>>>>>>>>>>> partitions() method, if partition() returns null, then even
>>>>>>>>>>> partitions()
>>>>>>>>>>>>>>> can return null(instead of an empty list). The
>>>>> RecordCollectorImpl
>>>>>>>>>>> code
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>> also be modified accordingly. @Chris, to your point, we can
>>> even
>>>>>>>>> drop
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> support of dropping of records. It came up during KIP
>>>>> discussion,
>>>>>>>>>>> and I
>>>>>>>>>>>>>>> thought it might be a useful feature. Let me know what you
>>>>> think.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3) Lastly about the partition number check. I wanted to avoid
>>>>> the
>>>>>>>>>>>>> throwing
>>>>>>>>>>>>>>> of exception so I thought adding it might be a useful
>>> feature.
>>>>> But
>>>>>>>>> as
>>>>>>>>>>>>> you
>>>>>>>>>>>>>>> pointed out, if it can break backwards compatibility, it's
>>>>> better
>>>>>>>>> to
>>>>>>>>>>>>> remove
>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>>>>>>>>>>> <ch...@aiven.io.invalid>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
>>>>>>>>> actually
>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>> support for dropping records in the partitioner? It doesn't
>>>>> seem
>>>>>>>>>>>>> necessary
>>>>>>>>>>>>>>>> based on the motivation for the KIP. If we remove that
>>> feature,
>>>>>>> we
>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>> handle null and/or empty lists by using the default
>>>>> partitioning,
>>>>>>>>>>>>>>>> equivalent to how we handle null return values from the
>>>>> existing
>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>> method today.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
>>>>>>> cadonna@apache.org
>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thank you for the updates!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I do not intend to prolong this vote thread more than
>>> needed,
>>>>>>>>> but I
>>>>>>>>>>>>>>>>> still have some points.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The deprecated partition method can return null if the
>>> default
>>>>>>>>>>>>>>>>> partitioning logic of the producer should be used.
>>>>>>>>>>>>>>>>> With the new method partitions() it seems that it is not
>>>>>>> possible
>>>>>>>>>>> to
>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>> the default partitioning logic, anymore.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Also, in the default implementation of method
>>> partitions(), a
>>>>>>>>>>> record
>>>>>>>>>>>>>>>>> that would use the default partitioning logic in method
>>>>>>>>> partition()
>>>>>>>>>>>>>>>>> would be dropped, which would break backward compatibility
>>>>> since
>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>> would always call the new method partitions() even though
>>> the
>>>>>>>>> users
>>>>>>>>>>>>>>>>> still implement the deprecated method partition().
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have a last point that we should probably discuss on the
>>> PR
>>>>>>> and
>>>>>>>>>>> not
>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>> the KIP but since you added the code in the KIP I need to
>>>>>>> mention
>>>>>>>>>>> it.
>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>> do not think you should check the validity of the partition
>>>>>>>>> number
>>>>>>>>>>>>> since
>>>>>>>>>>>>>>>>> the ProducerRecord does the same check and throws an
>>>>> exception.
>>>>>>>>> If
>>>>>>>>>>>>>>>>> Streams adds the same check but does not throw, the
>>> behavior
>>>>> is
>>>>>>>>> not
>>>>>>>>>>>>>>>>> backward compatible.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
>>>>>>>>>>>>>>>>>> Thanks Bruno/Chris,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Even I agree that might be better to keep it simple like
>>> the
>>>>>>> way
>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>>>> suggested. I have updated the KIP accordingly. I made
>>> couple
>>>>> of
>>>>>>>>>>> minor
>>>>>>>>>>>>>>>>>> changes to the KIP:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 1) One of them being the change of return type of
>>> partitions
>>>>>>>>>>> method
>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> List to Set. This is to ensure that in case the
>>>>> implementation
>>>>>>>>> of
>>>>>>>>>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
>>>>>>>>>>>>>>>>>> partition numbers, we won't have duplicates thereby not
>>>>> trying
>>>>>>>>> to
>>>>>>>>>>>>>>>> send to
>>>>>>>>>>>>>>>>>> the same partition multiple times due to this.
>>>>>>>>>>>>>>>>>> 2) I also added a check to send the record only to valid
>>>>>>>>> partition
>>>>>>>>>>>>>>>>> numbers
>>>>>>>>>>>>>>>>>> and log and drop when the partition number is invalid.
>>> This
>>>>> is
>>>>>>>>>>> again
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> prevent errors for cases when the StreamPartitioner
>>>>>>>>> implementation
>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>> bugs (since there are no validations as such).
>>>>>>>>>>>>>>>>>> 3) I also updated the Test Plan section based on the
>>>>> suggestion
>>>>>>>>>>> from
>>>>>>>>>>>>>>>>> Bruno.
>>>>>>>>>>>>>>>>>> 4) I updated the default implementation of partitions
>>> method
>>>>>>>>>>> based on
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> great catch from Chris!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Let me know if it looks fine now.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
>>>>>>>>> cadonna@apache.org
>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and
>>>>> leave
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I
>>>>> think
>>>>>>>>>>> that
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> the cleanest option.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I think it'd be useful to be more explicit about
>>>>> broadcasting
>>>>>>>>> to
>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>>> partitions rather than add implicit behavior for empty
>>>>> cases
>>>>>>>>>>> (empty
>>>>>>>>>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
>>>>>>> would
>>>>>>>>>>>>>>>> address
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> nicely.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> It's also worth noting that there's no hard requirement
>>> to
>>>>>>> add
>>>>>>>>>>>>> sugar
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> broadcasting to all topic partitions since the API
>>> already
>>>>>>>>>>> provides
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> number of topic partitions available when calling a
>>> stream
>>>>>>>>>>>>>>>> partitioner.
>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>> we can't find a clean way to add this support, it might
>>> be
>>>>>>>>>>> better
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> leave
>>>>>>>>>>>>>>>>>>>> it out and just let people implement this themselves
>>> with a
>>>>>>>>>>> line of
>>>>>>>>>>>>>>>>> Java
>>>>>>>>>>>>>>>>>>> 8
>>>>>>>>>>>>>>>>>>>> streams:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>             return IntStream.range(0,
>>>>>>>>>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Also worth noting that there may be a bug in the default
>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
>>>>>>> doesn't
>>>>>>>>>>>>>>>> appear
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> correctly pick up on null return values from the
>>> partition
>>>>>>>>>>> method
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> translate them into empty lists.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
>>>>>>>>>>> cadonna@apache.org>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I do not see an issue with using an empty list together
>>>>> with
>>>>>>>>> an
>>>>>>>>>>>>>>>> empty
>>>>>>>>>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty
>>> list
>>>>>>>>>>> would
>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>> say that there is no partition to which the record
>>> should
>>>>> be
>>>>>>>>>>> sent.
>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>> there is no partition, the record is dropped.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to
>>> all
>>>>>>>>>>>>>>>> partitions.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
>>>>>>> return
>>>>>>>>>>> an
>>>>>>>>>>>>>>>>> object
>>>>>>>>>>>>>>>>>>>>> with an enum and a list of partitions. The enum could
>>> have
>>>>>>>>>>> values
>>>>>>>>>>>>>>>>> SOME,
>>>>>>>>>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of
>>>>> partitions
>>>>>>>>>>>>>>>> contains
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> partitions to which to send the record. If the value of
>>>>> the
>>>>>>>>>>> enum
>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> ALL
>>>>>>>>>>>>>>>>>>>>> or NONE, the list of partitions is not used and might
>>> be
>>>>>>> even
>>>>>>>>>>> null
>>>>>>>>>>>>>>>>>>>>> (since in that case the list should not be used and it
>>>>> would
>>>>>>>>>>> be a
>>>>>>>>>>>>>>>> bug
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> use it).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>>>>>>>>>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I agree using null does seem error prone. However I
>>> think
>>>>>>>>>>> using a
>>>>>>>>>>>>>>>>>>>>> singleton
>>>>>>>>>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I
>>> am
>>>>>>>>>>> saying
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> because the KIP also has a provision to return an
>>> empty
>>>>>>> list
>>>>>>>>>>> to
>>>>>>>>>>>>>>>> refer
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> dropping the record. So, an empty optional and an
>>> empty
>>>>>>> list
>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> totally
>>>>>>>>>>>>>>>>>>>>>> different meanings which could get confusing.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Let me know what you think.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de
>>>>> Detrich
>>>>>>>>>>>>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I also concur with this, having an Optional in the
>>> type
>>>>>>>>>>> makes it
>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>> clear what’s going on and better signifies an
>>> absence of
>>>>>>>>>>> value
>>>>>>>>>>>>>>>> (or
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> case the broadcast value).
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> Matthew de Detrich
>>>>>>>>>>>>>>>>>>>>>>> Aiven Deutschland GmbH
>>>>>>>>>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>>>>>>>>>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>>>>>>>>>>>>>>>>>>>>>> m: +491603708037
>>>>>>>>>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>>>>>>>>>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>>>>>>>>>> I would prefer changing the return type of
>>> partitions()
>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty()
>>> as
>>>>> the
>>>>>>>>>>>>>>>> broadcast
>>>>>>>>>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation
>>> returns
>>>>>>>>> null
>>>>>>>>>>> due
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> bug
>>>>>>>>>>>>>>>>>>>>>>>> is much higher than that an implementation returns
>>> an
>>>>>>>>> empty
>>>>>>>>>>>>>>>>> Optional
>>>>>>>>>>>>>>>>>>>>> due
>>>>>>>>>>>>>>>>>>>>>>>> to a bug.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Hi All,

Just as an update, the changes described here:

```
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.
```

are reverted back. As things stand,  KeyQueryMetada exposes only the
partition() method and the restriction for single partition is added back
for IQ. This has been done based on the points raised by Matthias above.

The KIP has been updated accordingly.

Thanks!
Sagar.

On Sat, Dec 10, 2022 at 12:09 AM Sagar <sa...@gmail.com> wrote:

> Hey Matthias,
>
> Actually I had shared the PR link for any potential issues that might have
> gone missing. I guess it didn't come out that way in my response. Apologies
> for that!
>
> I am more than happy to incorporate any feedback/changes or address any
> concerns that are still present around this at this point as well.
>
> And I would keep in mind the feedback to provide more time in such a
> scenario.
>
> Thanks!
> Sagar.
>
> On Fri, Dec 9, 2022 at 11:41 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> It is what it is.
>>
>> > we did have internal discussions on this
>>
>> We sometimes have the case that a KIP need adjustment as stuff is
>> discovered during coding. And having a discussion on the PR about it is
>> fine. -- However, before the PR gets merge, the KIP change should be
>> announced to verify that nobody has objections to he change, before we
>> carry forward.
>>
>> It's up to the committer who reviews/merges the PR to ensure that this
>> process is followed IMHO. I hope we can do better next time.
>>
>> (And yes, there was the 3.4 release KIP deadline that might explain it,
>> but it seems important that we give enough time is make "tricky" changes
>> and not rush into stuff IMHO.)
>>
>>
>> -Matthias
>>
>>
>> On 12/8/22 7:04 PM, Sagar wrote:
>> > Thanks Matthias,
>> >
>> > Well, as things stand, we did have internal discussions on this and it
>> > seemed ok to open it up for IQ and more importantly not ok to have it
>> > opened up for FK-Join. And more importantly, the PR for this is already
>> > merged and some of these things came up during that. Here's the PR link:
>> > https://github.com/apache/kafka/pull/12803.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> >
>> > On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> >> Ah. Missed it as it does not have a nice "code block" similar to
>> >> `StreamPartitioner` changes.
>> >>
>> >> I understand the motivation, but I am wondering if we might head into a
>> >> tricky direction? State stores (at least the built-in ones) and IQ are
>> >> kinda build with the idea to have sharded data and that a multi-cast of
>> >> keys is an anti-pattern?
>> >>
>> >> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
>> >> sure that generalizing the concepts does not cause issues in the
>> future?
>> >>
>> >> Ie, should we claim that the multi-cast feature should be used for
>> >> KStreams only, but not for KTables?
>> >>
>> >> Just want to double check that we are not doing something we regret
>> later.
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >> On 12/7/22 6:45 PM, Sagar wrote:
>> >>> Hi Mathias,
>> >>>
>> >>> I did save it. The changes are added under Public Interfaces (Pt#2
>> about
>> >>> enhancing KeyQueryMetadata with partitions method) and
>> >>> throwing IllegalArgumentException when StreamPartitioner#partitions
>> >> method
>> >>> returns multiple partitions for just FK-join instead of the earlier
>> >> decided
>> >>> FK-Join and IQ.
>> >>>
>> >>> The background is that for IQ, if the users have multi casted records
>> to
>> >>> multiple partitions during ingestion but the fetch returns only a
>> single
>> >>> partition, then it would be wrong. That's why the restriction was
>> lifted
>> >>> for IQ and that's the reason KeyQueryMetadata now has another
>> >> partitions()
>> >>> method to signify the same.
>> >>>
>> >>> FK-Join also has a similar case, but while reviewing it was felt that
>> >>> FK-Join on it's own is fairly complicated and we don't need this
>> feature
>> >>> right away so the restriction still exists.
>> >>>
>> >>> Thanks!
>> >>> Sagar.
>> >>>
>> >>>
>> >>> On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >>>
>> >>>> I don't see any update on the wiki about it. Did you forget to hit
>> >> "save"?
>> >>>>
>> >>>> Can you also provide some background? I am not sure right now if I
>> >>>> understand the proposed changes?
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
>> >>>>> Thanks Sagar, this makes sense to me -- we clearly need additional
>> >>>> changes
>> >>>>> to
>> >>>>> avoid breaking IQ when using this feature, but I agree with
>> continuing
>> >> to
>> >>>>> restrict
>> >>>>> FKJ since they wouldn't stop working without it, and would become
>> much
>> >>>>> harder
>> >>>>> to reason about (than they already are) if we did enable them to use
>> >> it.
>> >>>>>
>> >>>>> And of course, they can still multicast the final results of a FKJ,
>> >> they
>> >>>>> just can't
>> >>>>> mess with the internal workings of it in this way.
>> >>>>>
>> >>>>> On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com>
>> >> wrote:
>> >>>>>
>> >>>>>> Hi All,
>> >>>>>>
>> >>>>>> I made a couple of edits to the KIP which came up during the code
>> >>>> review.
>> >>>>>> Changes at a high level are:
>> >>>>>>
>> >>>>>> 1) KeyQueryMetada enhanced to have a new method called
>> partitions().
>> >>>>>> 2) Lifting the restriction of a single partition for IQ. Now the
>> >>>>>> restriction holds only for FK Join.
>> >>>>>>
>> >>>>>> Updated KIP:
>> >>>>>>
>> >>>>
>> >>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
>> >>>>>>
>> >>>>>> Thanks!
>> >>>>>> Sagar.
>> >>>>>>
>> >>>>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com>
>> >>>> wrote:
>> >>>>>>
>> >>>>>>> Thanks Bruno,
>> >>>>>>>
>> >>>>>>> Marking this as accepted.
>> >>>>>>>
>> >>>>>>> Thanks everyone for their comments/feedback.
>> >>>>>>>
>> >>>>>>> Thanks!
>> >>>>>>> Sagar.
>> >>>>>>>
>> >>>>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cadonna@apache.org
>> >
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi Sagar,
>> >>>>>>>>
>> >>>>>>>> Thanks for the update and the PR!
>> >>>>>>>>
>> >>>>>>>> +1 (binding)
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Bruno
>> >>>>>>>>
>> >>>>>>>> On 10.09.22 18:57, Sagar wrote:
>> >>>>>>>>> Hi Bruno,
>> >>>>>>>>>
>> >>>>>>>>> Thanks, I think these changes make sense to me. I have updated
>> the
>> >>>> KIP
>> >>>>>>>>> accordingly.
>> >>>>>>>>>
>> >>>>>>>>> Thanks!
>> >>>>>>>>> Sagar.
>> >>>>>>>>>
>> >>>>>>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <
>> cadonna@apache.org>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Sagar,
>> >>>>>>>>>>
>> >>>>>>>>>> I would not drop the support for dropping records. I would also
>> >> not
>> >>>>>>>>>> return null from partitions(). Maybe an Optional can help
>> here. An
>> >>>>>>>> empty
>> >>>>>>>>>> Optional would mean to use the default partitioning behavior of
>> >> the
>> >>>>>>>>>> producer. So we would have:
>> >>>>>>>>>>
>> >>>>>>>>>> - non-empty Optional, non-empty list of integers: partitions to
>> >> send
>> >>>>>>>> the
>> >>>>>>>>>> record to
>> >>>>>>>>>> - non-empty Optional, empty list of integers: drop the record
>> >>>>>>>>>> - empty Optional: use default behavior
>> >>>>>>>>>>
>> >>>>>>>>>> What do other think?
>> >>>>>>>>>>
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Bruno
>> >>>>>>>>>>
>> >>>>>>>>>> On 02.09.22 13:53, Sagar wrote:
>> >>>>>>>>>>> Hello Bruno/Chris,
>> >>>>>>>>>>>
>> >>>>>>>>>>> Since these are the last set of changes(I am assuming haha),
>> it
>> >>>>>> would
>> >>>>>>>> be
>> >>>>>>>>>>> great if you could review the 2 options from above so that we
>> can
>> >>>>>>>> close
>> >>>>>>>>>> the
>> >>>>>>>>>>> voting. Of course I am happy to incorporate any other
>> requisite
>> >>>>>>>> changes.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks!
>> >>>>>>>>>>> Sagar.
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <
>> >> sagarmeansocean@gmail.com>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> Thanks Bruno for the great points.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I see 2 options here =>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 1) As Chris suggested, drop the support for dropping records
>> in
>> >>>> the
>> >>>>>>>>>>>> partitioner. That way, an empty list could signify the usage
>> of
>> >> a
>> >>>>>>>>>> default
>> >>>>>>>>>>>> partitioner. Also, if the deprecated partition() method
>> returns
>> >>>>>> null
>> >>>>>>>>>>>> thereby signifying the default partitioner, the partitions()
>> can
>> >>>>>>>> return
>> >>>>>>>>>> an
>> >>>>>>>>>>>> empty list i.e default partitioner.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 2) OR we treat a null return type of partitions() method to
>> >>>> signify
>> >>>>>>>> the
>> >>>>>>>>>>>> usage of the default partitioner. In the default
>> implementation
>> >> of
>> >>>>>>>>>>>> partitions() method, if partition() returns null, then even
>> >>>>>>>> partitions()
>> >>>>>>>>>>>> can return null(instead of an empty list). The
>> >> RecordCollectorImpl
>> >>>>>>>> code
>> >>>>>>>>>> can
>> >>>>>>>>>>>> also be modified accordingly. @Chris, to your point, we can
>> even
>> >>>>>> drop
>> >>>>>>>>>> the
>> >>>>>>>>>>>> support of dropping of records. It came up during KIP
>> >> discussion,
>> >>>>>>>> and I
>> >>>>>>>>>>>> thought it might be a useful feature. Let me know what you
>> >> think.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 3) Lastly about the partition number check. I wanted to avoid
>> >> the
>> >>>>>>>>>> throwing
>> >>>>>>>>>>>> of exception so I thought adding it might be a useful
>> feature.
>> >> But
>> >>>>>> as
>> >>>>>>>>>> you
>> >>>>>>>>>>>> pointed out, if it can break backwards compatibility, it's
>> >> better
>> >>>>>> to
>> >>>>>>>>>> remove
>> >>>>>>>>>>>> it.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks!
>> >>>>>>>>>>>> Sagar.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>> >>>>>>>> <ch...@aiven.io.invalid>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
>> >>>>>> actually
>> >>>>>>>>>> need
>> >>>>>>>>>>>>> support for dropping records in the partitioner? It doesn't
>> >> seem
>> >>>>>>>>>> necessary
>> >>>>>>>>>>>>> based on the motivation for the KIP. If we remove that
>> feature,
>> >>>> we
>> >>>>>>>>>> could
>> >>>>>>>>>>>>> handle null and/or empty lists by using the default
>> >> partitioning,
>> >>>>>>>>>>>>> equivalent to how we handle null return values from the
>> >> existing
>> >>>>>>>>>> partition
>> >>>>>>>>>>>>> method today.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
>> >>>> cadonna@apache.org
>> >>>>>>>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Hi Sagar,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Thank you for the updates!
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I do not intend to prolong this vote thread more than
>> needed,
>> >>>>>> but I
>> >>>>>>>>>>>>>> still have some points.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> The deprecated partition method can return null if the
>> default
>> >>>>>>>>>>>>>> partitioning logic of the producer should be used.
>> >>>>>>>>>>>>>> With the new method partitions() it seems that it is not
>> >>>> possible
>> >>>>>>>> to
>> >>>>>>>>>> use
>> >>>>>>>>>>>>>> the default partitioning logic, anymore.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Also, in the default implementation of method
>> partitions(), a
>> >>>>>>>> record
>> >>>>>>>>>>>>>> that would use the default partitioning logic in method
>> >>>>>> partition()
>> >>>>>>>>>>>>>> would be dropped, which would break backward compatibility
>> >> since
>> >>>>>>>>>> Streams
>> >>>>>>>>>>>>>> would always call the new method partitions() even though
>> the
>> >>>>>> users
>> >>>>>>>>>>>>>> still implement the deprecated method partition().
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I have a last point that we should probably discuss on the
>> PR
>> >>>> and
>> >>>>>>>> not
>> >>>>>>>>>> on
>> >>>>>>>>>>>>>> the KIP but since you added the code in the KIP I need to
>> >>>> mention
>> >>>>>>>> it.
>> >>>>>>>>>> I
>> >>>>>>>>>>>>>> do not think you should check the validity of the partition
>> >>>>>> number
>> >>>>>>>>>> since
>> >>>>>>>>>>>>>> the ProducerRecord does the same check and throws an
>> >> exception.
>> >>>>>> If
>> >>>>>>>>>>>>>> Streams adds the same check but does not throw, the
>> behavior
>> >> is
>> >>>>>> not
>> >>>>>>>>>>>>>> backward compatible.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>> Bruno
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
>> >>>>>>>>>>>>>>> Thanks Bruno/Chris,
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Even I agree that might be better to keep it simple like
>> the
>> >>>> way
>> >>>>>>>>>> Chris
>> >>>>>>>>>>>>>>> suggested. I have updated the KIP accordingly. I made
>> couple
>> >> of
>> >>>>>>>> minor
>> >>>>>>>>>>>>>>> changes to the KIP:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> 1) One of them being the change of return type of
>> partitions
>> >>>>>>>> method
>> >>>>>>>>>>>>> from
>> >>>>>>>>>>>>>>> List to Set. This is to ensure that in case the
>> >> implementation
>> >>>>>> of
>> >>>>>>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
>> >>>>>>>>>>>>>>> partition numbers, we won't have duplicates thereby not
>> >> trying
>> >>>>>> to
>> >>>>>>>>>>>>> send to
>> >>>>>>>>>>>>>>> the same partition multiple times due to this.
>> >>>>>>>>>>>>>>> 2) I also added a check to send the record only to valid
>> >>>>>> partition
>> >>>>>>>>>>>>>> numbers
>> >>>>>>>>>>>>>>> and log and drop when the partition number is invalid.
>> This
>> >> is
>> >>>>>>>> again
>> >>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>> prevent errors for cases when the StreamPartitioner
>> >>>>>> implementation
>> >>>>>>>>>> has
>> >>>>>>>>>>>>>> some
>> >>>>>>>>>>>>>>> bugs (since there are no validations as such).
>> >>>>>>>>>>>>>>> 3) I also updated the Test Plan section based on the
>> >> suggestion
>> >>>>>>>> from
>> >>>>>>>>>>>>>> Bruno.
>> >>>>>>>>>>>>>>> 4) I updated the default implementation of partitions
>> method
>> >>>>>>>> based on
>> >>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>> great catch from Chris!
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Let me know if it looks fine now.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Thanks!
>> >>>>>>>>>>>>>>> Sagar.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
>> >>>>>> cadonna@apache.org
>> >>>>>>>>>
>> >>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Hi,
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and
>> >> leave
>> >>>>>>>> the
>> >>>>>>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I
>> >> think
>> >>>>>>>> that
>> >>>>>>>>>> is
>> >>>>>>>>>>>>>>>> the cleanest option.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>>>> Bruno
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>> >>>>>>>>>>>>>>>>> Hi all,
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I think it'd be useful to be more explicit about
>> >> broadcasting
>> >>>>>> to
>> >>>>>>>>>> all
>> >>>>>>>>>>>>>>>> topic
>> >>>>>>>>>>>>>>>>> partitions rather than add implicit behavior for empty
>> >> cases
>> >>>>>>>> (empty
>> >>>>>>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
>> >>>> would
>> >>>>>>>>>>>>> address
>> >>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>> nicely.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> It's also worth noting that there's no hard requirement
>> to
>> >>>> add
>> >>>>>>>>>> sugar
>> >>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>> broadcasting to all topic partitions since the API
>> already
>> >>>>>>>> provides
>> >>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>> number of topic partitions available when calling a
>> stream
>> >>>>>>>>>>>>> partitioner.
>> >>>>>>>>>>>>>>>> If
>> >>>>>>>>>>>>>>>>> we can't find a clean way to add this support, it might
>> be
>> >>>>>>>> better
>> >>>>>>>>>> to
>> >>>>>>>>>>>>>>>> leave
>> >>>>>>>>>>>>>>>>> it out and just let people implement this themselves
>> with a
>> >>>>>>>> line of
>> >>>>>>>>>>>>>> Java
>> >>>>>>>>>>>>>>>> 8
>> >>>>>>>>>>>>>>>>> streams:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>            return IntStream.range(0,
>> >>>>>>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Also worth noting that there may be a bug in the default
>> >>>>>>>>>>>>> implementation
>> >>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
>> >>>> doesn't
>> >>>>>>>>>>>>> appear
>> >>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>> correctly pick up on null return values from the
>> partition
>> >>>>>>>> method
>> >>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>> translate them into empty lists.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Cheers,
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Chris
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
>> >>>>>>>> cadonna@apache.org>
>> >>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Hi Sagar,
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> I do not see an issue with using an empty list together
>> >> with
>> >>>>>> an
>> >>>>>>>>>>>>> empty
>> >>>>>>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty
>> list
>> >>>>>>>> would
>> >>>>>>>>>>>>> just
>> >>>>>>>>>>>>>>>>>> say that there is no partition to which the record
>> should
>> >> be
>> >>>>>>>> sent.
>> >>>>>>>>>>>>> If
>> >>>>>>>>>>>>>>>>>> there is no partition, the record is dropped.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to
>> all
>> >>>>>>>>>>>>> partitions.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
>> >>>> return
>> >>>>>>>> an
>> >>>>>>>>>>>>>> object
>> >>>>>>>>>>>>>>>>>> with an enum and a list of partitions. The enum could
>> have
>> >>>>>>>> values
>> >>>>>>>>>>>>>> SOME,
>> >>>>>>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of
>> >> partitions
>> >>>>>>>>>>>>> contains
>> >>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> partitions to which to send the record. If the value of
>> >> the
>> >>>>>>>> enum
>> >>>>>>>>>> is
>> >>>>>>>>>>>>>> ALL
>> >>>>>>>>>>>>>>>>>> or NONE, the list of partitions is not used and might
>> be
>> >>>> even
>> >>>>>>>> null
>> >>>>>>>>>>>>>>>>>> (since in that case the list should not be used and it
>> >> would
>> >>>>>>>> be a
>> >>>>>>>>>>>>> bug
>> >>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>> use it).
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>>>>>> Bruno
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>> >>>>>>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> I agree using null does seem error prone. However I
>> think
>> >>>>>>>> using a
>> >>>>>>>>>>>>>>>>>> singleton
>> >>>>>>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I
>> am
>> >>>>>>>> saying
>> >>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>> because the KIP also has a provision to return an
>> empty
>> >>>> list
>> >>>>>>>> to
>> >>>>>>>>>>>>> refer
>> >>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>>> dropping the record. So, an empty optional and an
>> empty
>> >>>> list
>> >>>>>>>> have
>> >>>>>>>>>>>>>>>> totally
>> >>>>>>>>>>>>>>>>>>> different meanings which could get confusing.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> Let me know what you think.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> Thanks!
>> >>>>>>>>>>>>>>>>>>> Sagar.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de
>> >> Detrich
>> >>>>>>>>>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> I also concur with this, having an Optional in the
>> type
>> >>>>>>>> makes it
>> >>>>>>>>>>>>>> very
>> >>>>>>>>>>>>>>>>>>>> clear what’s going on and better signifies an
>> absence of
>> >>>>>>>> value
>> >>>>>>>>>>>>> (or
>> >>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>> case the broadcast value).
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> --
>> >>>>>>>>>>>>>>>>>>>> Matthew de Detrich
>> >>>>>>>>>>>>>>>>>>>> Aiven Deutschland GmbH
>> >>>>>>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>> >>>>>>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> >>>>>>>>>>>>>>>>>>>> m: +491603708037
>> >>>>>>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>> >>>>>>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
>> >>>> wrote:
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> 2.
>> >>>>>>>>>>>>>>>>>>>>> I would prefer changing the return type of
>> partitions()
>> >>>> to
>> >>>>>>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty()
>> as
>> >> the
>> >>>>>>>>>>>>> broadcast
>> >>>>>>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation
>> returns
>> >>>>>> null
>> >>>>>>>> due
>> >>>>>>>>>>>>> to
>> >>>>>>>>>>>>>> a
>> >>>>>>>>>>>>>>>>>> bug
>> >>>>>>>>>>>>>>>>>>>>> is much higher than that an implementation returns
>> an
>> >>>>>> empty
>> >>>>>>>>>>>>>> Optional
>> >>>>>>>>>>>>>>>>>> due
>> >>>>>>>>>>>>>>>>>>>>> to a bug.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Hey Matthias,

Actually I had shared the PR link for any potential issues that might have
gone missing. I guess it didn't come out that way in my response. Apologies
for that!

I am more than happy to incorporate any feedback/changes or address any
concerns that are still present around this at this point as well.

And I would keep in mind the feedback to provide more time in such a
scenario.

Thanks!
Sagar.

On Fri, Dec 9, 2022 at 11:41 PM Matthias J. Sax <mj...@apache.org> wrote:

> It is what it is.
>
> > we did have internal discussions on this
>
> We sometimes have the case that a KIP need adjustment as stuff is
> discovered during coding. And having a discussion on the PR about it is
> fine. -- However, before the PR gets merge, the KIP change should be
> announced to verify that nobody has objections to he change, before we
> carry forward.
>
> It's up to the committer who reviews/merges the PR to ensure that this
> process is followed IMHO. I hope we can do better next time.
>
> (And yes, there was the 3.4 release KIP deadline that might explain it,
> but it seems important that we give enough time is make "tricky" changes
> and not rush into stuff IMHO.)
>
>
> -Matthias
>
>
> On 12/8/22 7:04 PM, Sagar wrote:
> > Thanks Matthias,
> >
> > Well, as things stand, we did have internal discussions on this and it
> > seemed ok to open it up for IQ and more importantly not ok to have it
> > opened up for FK-Join. And more importantly, the PR for this is already
> > merged and some of these things came up during that. Here's the PR link:
> > https://github.com/apache/kafka/pull/12803.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> Ah. Missed it as it does not have a nice "code block" similar to
> >> `StreamPartitioner` changes.
> >>
> >> I understand the motivation, but I am wondering if we might head into a
> >> tricky direction? State stores (at least the built-in ones) and IQ are
> >> kinda build with the idea to have sharded data and that a multi-cast of
> >> keys is an anti-pattern?
> >>
> >> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
> >> sure that generalizing the concepts does not cause issues in the future?
> >>
> >> Ie, should we claim that the multi-cast feature should be used for
> >> KStreams only, but not for KTables?
> >>
> >> Just want to double check that we are not doing something we regret
> later.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 12/7/22 6:45 PM, Sagar wrote:
> >>> Hi Mathias,
> >>>
> >>> I did save it. The changes are added under Public Interfaces (Pt#2
> about
> >>> enhancing KeyQueryMetadata with partitions method) and
> >>> throwing IllegalArgumentException when StreamPartitioner#partitions
> >> method
> >>> returns multiple partitions for just FK-join instead of the earlier
> >> decided
> >>> FK-Join and IQ.
> >>>
> >>> The background is that for IQ, if the users have multi casted records
> to
> >>> multiple partitions during ingestion but the fetch returns only a
> single
> >>> partition, then it would be wrong. That's why the restriction was
> lifted
> >>> for IQ and that's the reason KeyQueryMetadata now has another
> >> partitions()
> >>> method to signify the same.
> >>>
> >>> FK-Join also has a similar case, but while reviewing it was felt that
> >>> FK-Join on it's own is fairly complicated and we don't need this
> feature
> >>> right away so the restriction still exists.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>>
> >>> On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >>>
> >>>> I don't see any update on the wiki about it. Did you forget to hit
> >> "save"?
> >>>>
> >>>> Can you also provide some background? I am not sure right now if I
> >>>> understand the proposed changes?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> >>>>> Thanks Sagar, this makes sense to me -- we clearly need additional
> >>>> changes
> >>>>> to
> >>>>> avoid breaking IQ when using this feature, but I agree with
> continuing
> >> to
> >>>>> restrict
> >>>>> FKJ since they wouldn't stop working without it, and would become
> much
> >>>>> harder
> >>>>> to reason about (than they already are) if we did enable them to use
> >> it.
> >>>>>
> >>>>> And of course, they can still multicast the final results of a FKJ,
> >> they
> >>>>> just can't
> >>>>> mess with the internal workings of it in this way.
> >>>>>
> >>>>> On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> I made a couple of edits to the KIP which came up during the code
> >>>> review.
> >>>>>> Changes at a high level are:
> >>>>>>
> >>>>>> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >>>>>> 2) Lifting the restriction of a single partition for IQ. Now the
> >>>>>> restriction holds only for FK Join.
> >>>>>>
> >>>>>> Updated KIP:
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>>>>>
> >>>>>> Thanks!
> >>>>>> Sagar.
> >>>>>>
> >>>>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> Thanks Bruno,
> >>>>>>>
> >>>>>>> Marking this as accepted.
> >>>>>>>
> >>>>>>> Thanks everyone for their comments/feedback.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Sagar,
> >>>>>>>>
> >>>>>>>> Thanks for the update and the PR!
> >>>>>>>>
> >>>>>>>> +1 (binding)
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>> On 10.09.22 18:57, Sagar wrote:
> >>>>>>>>> Hi Bruno,
> >>>>>>>>>
> >>>>>>>>> Thanks, I think these changes make sense to me. I have updated
> the
> >>>> KIP
> >>>>>>>>> accordingly.
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> Sagar.
> >>>>>>>>>
> >>>>>>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <cadonna@apache.org
> >
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Sagar,
> >>>>>>>>>>
> >>>>>>>>>> I would not drop the support for dropping records. I would also
> >> not
> >>>>>>>>>> return null from partitions(). Maybe an Optional can help here.
> An
> >>>>>>>> empty
> >>>>>>>>>> Optional would mean to use the default partitioning behavior of
> >> the
> >>>>>>>>>> producer. So we would have:
> >>>>>>>>>>
> >>>>>>>>>> - non-empty Optional, non-empty list of integers: partitions to
> >> send
> >>>>>>>> the
> >>>>>>>>>> record to
> >>>>>>>>>> - non-empty Optional, empty list of integers: drop the record
> >>>>>>>>>> - empty Optional: use default behavior
> >>>>>>>>>>
> >>>>>>>>>> What do other think?
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Bruno
> >>>>>>>>>>
> >>>>>>>>>> On 02.09.22 13:53, Sagar wrote:
> >>>>>>>>>>> Hello Bruno/Chris,
> >>>>>>>>>>>
> >>>>>>>>>>> Since these are the last set of changes(I am assuming haha), it
> >>>>>> would
> >>>>>>>> be
> >>>>>>>>>>> great if you could review the 2 options from above so that we
> can
> >>>>>>>> close
> >>>>>>>>>> the
> >>>>>>>>>>> voting. Of course I am happy to incorporate any other requisite
> >>>>>>>> changes.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Sagar.
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <
> >> sagarmeansocean@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Thanks Bruno for the great points.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I see 2 options here =>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1) As Chris suggested, drop the support for dropping records
> in
> >>>> the
> >>>>>>>>>>>> partitioner. That way, an empty list could signify the usage
> of
> >> a
> >>>>>>>>>> default
> >>>>>>>>>>>> partitioner. Also, if the deprecated partition() method
> returns
> >>>>>> null
> >>>>>>>>>>>> thereby signifying the default partitioner, the partitions()
> can
> >>>>>>>> return
> >>>>>>>>>> an
> >>>>>>>>>>>> empty list i.e default partitioner.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2) OR we treat a null return type of partitions() method to
> >>>> signify
> >>>>>>>> the
> >>>>>>>>>>>> usage of the default partitioner. In the default
> implementation
> >> of
> >>>>>>>>>>>> partitions() method, if partition() returns null, then even
> >>>>>>>> partitions()
> >>>>>>>>>>>> can return null(instead of an empty list). The
> >> RecordCollectorImpl
> >>>>>>>> code
> >>>>>>>>>> can
> >>>>>>>>>>>> also be modified accordingly. @Chris, to your point, we can
> even
> >>>>>> drop
> >>>>>>>>>> the
> >>>>>>>>>>>> support of dropping of records. It came up during KIP
> >> discussion,
> >>>>>>>> and I
> >>>>>>>>>>>> thought it might be a useful feature. Let me know what you
> >> think.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3) Lastly about the partition number check. I wanted to avoid
> >> the
> >>>>>>>>>> throwing
> >>>>>>>>>>>> of exception so I thought adding it might be a useful feature.
> >> But
> >>>>>> as
> >>>>>>>>>> you
> >>>>>>>>>>>> pointed out, if it can break backwards compatibility, it's
> >> better
> >>>>>> to
> >>>>>>>>>> remove
> >>>>>>>>>>>> it.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks!
> >>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
> >>>>>>>> <ch...@aiven.io.invalid>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
> >>>>>> actually
> >>>>>>>>>> need
> >>>>>>>>>>>>> support for dropping records in the partitioner? It doesn't
> >> seem
> >>>>>>>>>> necessary
> >>>>>>>>>>>>> based on the motivation for the KIP. If we remove that
> feature,
> >>>> we
> >>>>>>>>>> could
> >>>>>>>>>>>>> handle null and/or empty lists by using the default
> >> partitioning,
> >>>>>>>>>>>>> equivalent to how we handle null return values from the
> >> existing
> >>>>>>>>>> partition
> >>>>>>>>>>>>> method today.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
> >>>> cadonna@apache.org
> >>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Sagar,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you for the updates!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I do not intend to prolong this vote thread more than
> needed,
> >>>>>> but I
> >>>>>>>>>>>>>> still have some points.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The deprecated partition method can return null if the
> default
> >>>>>>>>>>>>>> partitioning logic of the producer should be used.
> >>>>>>>>>>>>>> With the new method partitions() it seems that it is not
> >>>> possible
> >>>>>>>> to
> >>>>>>>>>> use
> >>>>>>>>>>>>>> the default partitioning logic, anymore.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Also, in the default implementation of method partitions(),
> a
> >>>>>>>> record
> >>>>>>>>>>>>>> that would use the default partitioning logic in method
> >>>>>> partition()
> >>>>>>>>>>>>>> would be dropped, which would break backward compatibility
> >> since
> >>>>>>>>>> Streams
> >>>>>>>>>>>>>> would always call the new method partitions() even though
> the
> >>>>>> users
> >>>>>>>>>>>>>> still implement the deprecated method partition().
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I have a last point that we should probably discuss on the
> PR
> >>>> and
> >>>>>>>> not
> >>>>>>>>>> on
> >>>>>>>>>>>>>> the KIP but since you added the code in the KIP I need to
> >>>> mention
> >>>>>>>> it.
> >>>>>>>>>> I
> >>>>>>>>>>>>>> do not think you should check the validity of the partition
> >>>>>> number
> >>>>>>>>>> since
> >>>>>>>>>>>>>> the ProducerRecord does the same check and throws an
> >> exception.
> >>>>>> If
> >>>>>>>>>>>>>> Streams adds the same check but does not throw, the behavior
> >> is
> >>>>>> not
> >>>>>>>>>>>>>> backward compatible.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
> >>>>>>>>>>>>>>> Thanks Bruno/Chris,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Even I agree that might be better to keep it simple like
> the
> >>>> way
> >>>>>>>>>> Chris
> >>>>>>>>>>>>>>> suggested. I have updated the KIP accordingly. I made
> couple
> >> of
> >>>>>>>> minor
> >>>>>>>>>>>>>>> changes to the KIP:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) One of them being the change of return type of
> partitions
> >>>>>>>> method
> >>>>>>>>>>>>> from
> >>>>>>>>>>>>>>> List to Set. This is to ensure that in case the
> >> implementation
> >>>>>> of
> >>>>>>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
> >>>>>>>>>>>>>>> partition numbers, we won't have duplicates thereby not
> >> trying
> >>>>>> to
> >>>>>>>>>>>>> send to
> >>>>>>>>>>>>>>> the same partition multiple times due to this.
> >>>>>>>>>>>>>>> 2) I also added a check to send the record only to valid
> >>>>>> partition
> >>>>>>>>>>>>>> numbers
> >>>>>>>>>>>>>>> and log and drop when the partition number is invalid. This
> >> is
> >>>>>>>> again
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> prevent errors for cases when the StreamPartitioner
> >>>>>> implementation
> >>>>>>>>>> has
> >>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>> bugs (since there are no validations as such).
> >>>>>>>>>>>>>>> 3) I also updated the Test Plan section based on the
> >> suggestion
> >>>>>>>> from
> >>>>>>>>>>>>>> Bruno.
> >>>>>>>>>>>>>>> 4) I updated the default implementation of partitions
> method
> >>>>>>>> based on
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>> great catch from Chris!
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Let me know if it looks fine now.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
> >>>>>> cadonna@apache.org
> >>>>>>>>>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and
> >> leave
> >>>>>>>> the
> >>>>>>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I
> >> think
> >>>>>>>> that
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>> the cleanest option.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
> >>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I think it'd be useful to be more explicit about
> >> broadcasting
> >>>>>> to
> >>>>>>>>>> all
> >>>>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>>>> partitions rather than add implicit behavior for empty
> >> cases
> >>>>>>>> (empty
> >>>>>>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
> >>>> would
> >>>>>>>>>>>>> address
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>> nicely.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> It's also worth noting that there's no hard requirement
> to
> >>>> add
> >>>>>>>>>> sugar
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> broadcasting to all topic partitions since the API
> already
> >>>>>>>> provides
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>> number of topic partitions available when calling a
> stream
> >>>>>>>>>>>>> partitioner.
> >>>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>> we can't find a clean way to add this support, it might
> be
> >>>>>>>> better
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> leave
> >>>>>>>>>>>>>>>>> it out and just let people implement this themselves
> with a
> >>>>>>>> line of
> >>>>>>>>>>>>>> Java
> >>>>>>>>>>>>>>>> 8
> >>>>>>>>>>>>>>>>> streams:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>            return IntStream.range(0,
> >>>>>>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Also worth noting that there may be a bug in the default
> >>>>>>>>>>>>> implementation
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
> >>>> doesn't
> >>>>>>>>>>>>> appear
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> correctly pick up on null return values from the
> partition
> >>>>>>>> method
> >>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>> translate them into empty lists.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
> >>>>>>>> cadonna@apache.org>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Sagar,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I do not see an issue with using an empty list together
> >> with
> >>>>>> an
> >>>>>>>>>>>>> empty
> >>>>>>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty
> list
> >>>>>>>> would
> >>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>> say that there is no partition to which the record
> should
> >> be
> >>>>>>>> sent.
> >>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>> there is no partition, the record is dropped.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to
> all
> >>>>>>>>>>>>> partitions.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
> >>>> return
> >>>>>>>> an
> >>>>>>>>>>>>>> object
> >>>>>>>>>>>>>>>>>> with an enum and a list of partitions. The enum could
> have
> >>>>>>>> values
> >>>>>>>>>>>>>> SOME,
> >>>>>>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of
> >> partitions
> >>>>>>>>>>>>> contains
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> partitions to which to send the record. If the value of
> >> the
> >>>>>>>> enum
> >>>>>>>>>> is
> >>>>>>>>>>>>>> ALL
> >>>>>>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be
> >>>> even
> >>>>>>>> null
> >>>>>>>>>>>>>>>>>> (since in that case the list should not be used and it
> >> would
> >>>>>>>> be a
> >>>>>>>>>>>>> bug
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> use it).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
> >>>>>>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I agree using null does seem error prone. However I
> think
> >>>>>>>> using a
> >>>>>>>>>>>>>>>>>> singleton
> >>>>>>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I
> am
> >>>>>>>> saying
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> because the KIP also has a provision to return an empty
> >>>> list
> >>>>>>>> to
> >>>>>>>>>>>>> refer
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty
> >>>> list
> >>>>>>>> have
> >>>>>>>>>>>>>>>> totally
> >>>>>>>>>>>>>>>>>>> different meanings which could get confusing.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de
> >> Detrich
> >>>>>>>>>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I also concur with this, having an Optional in the
> type
> >>>>>>>> makes it
> >>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence
> of
> >>>>>>>> value
> >>>>>>>>>>>>> (or
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>> case the broadcast value).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>> Matthew de Detrich
> >>>>>>>>>>>>>>>>>>>> Aiven Deutschland GmbH
> >>>>>>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>>>>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>>>>>>>>>>>>>>>>>> m: +491603708037
> >>>>>>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
> >>>>>>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
> >>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>> I would prefer changing the return type of
> partitions()
> >>>> to
> >>>>>>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as
> >> the
> >>>>>>>>>>>>> broadcast
> >>>>>>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation
> returns
> >>>>>> null
> >>>>>>>> due
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> bug
> >>>>>>>>>>>>>>>>>>>>> is much higher than that an implementation returns an
> >>>>>> empty
> >>>>>>>>>>>>>> Optional
> >>>>>>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>>>>>> to a bug.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by "Matthias J. Sax" <mj...@apache.org>.
It is what it is.

> we did have internal discussions on this 

We sometimes have the case that a KIP need adjustment as stuff is 
discovered during coding. And having a discussion on the PR about it is 
fine. -- However, before the PR gets merge, the KIP change should be 
announced to verify that nobody has objections to he change, before we 
carry forward.

It's up to the committer who reviews/merges the PR to ensure that this 
process is followed IMHO. I hope we can do better next time.

(And yes, there was the 3.4 release KIP deadline that might explain it, 
but it seems important that we give enough time is make "tricky" changes 
and not rush into stuff IMHO.)


-Matthias


On 12/8/22 7:04 PM, Sagar wrote:
> Thanks Matthias,
> 
> Well, as things stand, we did have internal discussions on this and it
> seemed ok to open it up for IQ and more importantly not ok to have it
> opened up for FK-Join. And more importantly, the PR for this is already
> merged and some of these things came up during that. Here's the PR link:
> https://github.com/apache/kafka/pull/12803.
> 
> Thanks!
> Sagar.
> 
> 
> On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> Ah. Missed it as it does not have a nice "code block" similar to
>> `StreamPartitioner` changes.
>>
>> I understand the motivation, but I am wondering if we might head into a
>> tricky direction? State stores (at least the built-in ones) and IQ are
>> kinda build with the idea to have sharded data and that a multi-cast of
>> keys is an anti-pattern?
>>
>> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
>> sure that generalizing the concepts does not cause issues in the future?
>>
>> Ie, should we claim that the multi-cast feature should be used for
>> KStreams only, but not for KTables?
>>
>> Just want to double check that we are not doing something we regret later.
>>
>>
>> -Matthias
>>
>>
>> On 12/7/22 6:45 PM, Sagar wrote:
>>> Hi Mathias,
>>>
>>> I did save it. The changes are added under Public Interfaces (Pt#2 about
>>> enhancing KeyQueryMetadata with partitions method) and
>>> throwing IllegalArgumentException when StreamPartitioner#partitions
>> method
>>> returns multiple partitions for just FK-join instead of the earlier
>> decided
>>> FK-Join and IQ.
>>>
>>> The background is that for IQ, if the users have multi casted records to
>>> multiple partitions during ingestion but the fetch returns only a single
>>> partition, then it would be wrong. That's why the restriction was lifted
>>> for IQ and that's the reason KeyQueryMetadata now has another
>> partitions()
>>> method to signify the same.
>>>
>>> FK-Join also has a similar case, but while reviewing it was felt that
>>> FK-Join on it's own is fairly complicated and we don't need this feature
>>> right away so the restriction still exists.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>>
>>> On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
>>>
>>>> I don't see any update on the wiki about it. Did you forget to hit
>> "save"?
>>>>
>>>> Can you also provide some background? I am not sure right now if I
>>>> understand the proposed changes?
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
>>>>> Thanks Sagar, this makes sense to me -- we clearly need additional
>>>> changes
>>>>> to
>>>>> avoid breaking IQ when using this feature, but I agree with continuing
>> to
>>>>> restrict
>>>>> FKJ since they wouldn't stop working without it, and would become much
>>>>> harder
>>>>> to reason about (than they already are) if we did enable them to use
>> it.
>>>>>
>>>>> And of course, they can still multicast the final results of a FKJ,
>> they
>>>>> just can't
>>>>> mess with the internal workings of it in this way.
>>>>>
>>>>> On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com>
>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I made a couple of edits to the KIP which came up during the code
>>>> review.
>>>>>> Changes at a high level are:
>>>>>>
>>>>>> 1) KeyQueryMetada enhanced to have a new method called partitions().
>>>>>> 2) Lifting the restriction of a single partition for IQ. Now the
>>>>>> restriction holds only for FK Join.
>>>>>>
>>>>>> Updated KIP:
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
>>>>>>
>>>>>> Thanks!
>>>>>> Sagar.
>>>>>>
>>>>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> Thanks Bruno,
>>>>>>>
>>>>>>> Marking this as accepted.
>>>>>>>
>>>>>>> Thanks everyone for their comments/feedback.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Sagar.
>>>>>>>
>>>>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Sagar,
>>>>>>>>
>>>>>>>> Thanks for the update and the PR!
>>>>>>>>
>>>>>>>> +1 (binding)
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Bruno
>>>>>>>>
>>>>>>>> On 10.09.22 18:57, Sagar wrote:
>>>>>>>>> Hi Bruno,
>>>>>>>>>
>>>>>>>>> Thanks, I think these changes make sense to me. I have updated the
>>>> KIP
>>>>>>>>> accordingly.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Sagar.
>>>>>>>>>
>>>>>>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Sagar,
>>>>>>>>>>
>>>>>>>>>> I would not drop the support for dropping records. I would also
>> not
>>>>>>>>>> return null from partitions(). Maybe an Optional can help here. An
>>>>>>>> empty
>>>>>>>>>> Optional would mean to use the default partitioning behavior of
>> the
>>>>>>>>>> producer. So we would have:
>>>>>>>>>>
>>>>>>>>>> - non-empty Optional, non-empty list of integers: partitions to
>> send
>>>>>>>> the
>>>>>>>>>> record to
>>>>>>>>>> - non-empty Optional, empty list of integers: drop the record
>>>>>>>>>> - empty Optional: use default behavior
>>>>>>>>>>
>>>>>>>>>> What do other think?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Bruno
>>>>>>>>>>
>>>>>>>>>> On 02.09.22 13:53, Sagar wrote:
>>>>>>>>>>> Hello Bruno/Chris,
>>>>>>>>>>>
>>>>>>>>>>> Since these are the last set of changes(I am assuming haha), it
>>>>>> would
>>>>>>>> be
>>>>>>>>>>> great if you could review the 2 options from above so that we can
>>>>>>>> close
>>>>>>>>>> the
>>>>>>>>>>> voting. Of course I am happy to incorporate any other requisite
>>>>>>>> changes.
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Sagar.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <
>> sagarmeansocean@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Bruno for the great points.
>>>>>>>>>>>>
>>>>>>>>>>>> I see 2 options here =>
>>>>>>>>>>>>
>>>>>>>>>>>> 1) As Chris suggested, drop the support for dropping records in
>>>> the
>>>>>>>>>>>> partitioner. That way, an empty list could signify the usage of
>> a
>>>>>>>>>> default
>>>>>>>>>>>> partitioner. Also, if the deprecated partition() method returns
>>>>>> null
>>>>>>>>>>>> thereby signifying the default partitioner, the partitions() can
>>>>>>>> return
>>>>>>>>>> an
>>>>>>>>>>>> empty list i.e default partitioner.
>>>>>>>>>>>>
>>>>>>>>>>>> 2) OR we treat a null return type of partitions() method to
>>>> signify
>>>>>>>> the
>>>>>>>>>>>> usage of the default partitioner. In the default implementation
>> of
>>>>>>>>>>>> partitions() method, if partition() returns null, then even
>>>>>>>> partitions()
>>>>>>>>>>>> can return null(instead of an empty list). The
>> RecordCollectorImpl
>>>>>>>> code
>>>>>>>>>> can
>>>>>>>>>>>> also be modified accordingly. @Chris, to your point, we can even
>>>>>> drop
>>>>>>>>>> the
>>>>>>>>>>>> support of dropping of records. It came up during KIP
>> discussion,
>>>>>>>> and I
>>>>>>>>>>>> thought it might be a useful feature. Let me know what you
>> think.
>>>>>>>>>>>>
>>>>>>>>>>>> 3) Lastly about the partition number check. I wanted to avoid
>> the
>>>>>>>>>> throwing
>>>>>>>>>>>> of exception so I thought adding it might be a useful feature.
>> But
>>>>>> as
>>>>>>>>>> you
>>>>>>>>>>>> pointed out, if it can break backwards compatibility, it's
>> better
>>>>>> to
>>>>>>>>>> remove
>>>>>>>>>>>> it.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>>>>>>>> <ch...@aiven.io.invalid>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
>>>>>> actually
>>>>>>>>>> need
>>>>>>>>>>>>> support for dropping records in the partitioner? It doesn't
>> seem
>>>>>>>>>> necessary
>>>>>>>>>>>>> based on the motivation for the KIP. If we remove that feature,
>>>> we
>>>>>>>>>> could
>>>>>>>>>>>>> handle null and/or empty lists by using the default
>> partitioning,
>>>>>>>>>>>>> equivalent to how we handle null return values from the
>> existing
>>>>>>>>>> partition
>>>>>>>>>>>>> method today.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
>>>> cadonna@apache.org
>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you for the updates!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I do not intend to prolong this vote thread more than needed,
>>>>>> but I
>>>>>>>>>>>>>> still have some points.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The deprecated partition method can return null if the default
>>>>>>>>>>>>>> partitioning logic of the producer should be used.
>>>>>>>>>>>>>> With the new method partitions() it seems that it is not
>>>> possible
>>>>>>>> to
>>>>>>>>>> use
>>>>>>>>>>>>>> the default partitioning logic, anymore.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also, in the default implementation of method partitions(), a
>>>>>>>> record
>>>>>>>>>>>>>> that would use the default partitioning logic in method
>>>>>> partition()
>>>>>>>>>>>>>> would be dropped, which would break backward compatibility
>> since
>>>>>>>>>> Streams
>>>>>>>>>>>>>> would always call the new method partitions() even though the
>>>>>> users
>>>>>>>>>>>>>> still implement the deprecated method partition().
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a last point that we should probably discuss on the PR
>>>> and
>>>>>>>> not
>>>>>>>>>> on
>>>>>>>>>>>>>> the KIP but since you added the code in the KIP I need to
>>>> mention
>>>>>>>> it.
>>>>>>>>>> I
>>>>>>>>>>>>>> do not think you should check the validity of the partition
>>>>>> number
>>>>>>>>>> since
>>>>>>>>>>>>>> the ProducerRecord does the same check and throws an
>> exception.
>>>>>> If
>>>>>>>>>>>>>> Streams adds the same check but does not throw, the behavior
>> is
>>>>>> not
>>>>>>>>>>>>>> backward compatible.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
>>>>>>>>>>>>>>> Thanks Bruno/Chris,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Even I agree that might be better to keep it simple like the
>>>> way
>>>>>>>>>> Chris
>>>>>>>>>>>>>>> suggested. I have updated the KIP accordingly. I made couple
>> of
>>>>>>>> minor
>>>>>>>>>>>>>>> changes to the KIP:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1) One of them being the change of return type of partitions
>>>>>>>> method
>>>>>>>>>>>>> from
>>>>>>>>>>>>>>> List to Set. This is to ensure that in case the
>> implementation
>>>>>> of
>>>>>>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
>>>>>>>>>>>>>>> partition numbers, we won't have duplicates thereby not
>> trying
>>>>>> to
>>>>>>>>>>>>> send to
>>>>>>>>>>>>>>> the same partition multiple times due to this.
>>>>>>>>>>>>>>> 2) I also added a check to send the record only to valid
>>>>>> partition
>>>>>>>>>>>>>> numbers
>>>>>>>>>>>>>>> and log and drop when the partition number is invalid. This
>> is
>>>>>>>> again
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> prevent errors for cases when the StreamPartitioner
>>>>>> implementation
>>>>>>>>>> has
>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>> bugs (since there are no validations as such).
>>>>>>>>>>>>>>> 3) I also updated the Test Plan section based on the
>> suggestion
>>>>>>>> from
>>>>>>>>>>>>>> Bruno.
>>>>>>>>>>>>>>> 4) I updated the default implementation of partitions method
>>>>>>>> based on
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> great catch from Chris!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Let me know if it looks fine now.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
>>>>>> cadonna@apache.org
>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and
>> leave
>>>>>>>> the
>>>>>>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I
>> think
>>>>>>>> that
>>>>>>>>>> is
>>>>>>>>>>>>>>>> the cleanest option.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I think it'd be useful to be more explicit about
>> broadcasting
>>>>>> to
>>>>>>>>>> all
>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>> partitions rather than add implicit behavior for empty
>> cases
>>>>>>>> (empty
>>>>>>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
>>>> would
>>>>>>>>>>>>> address
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> nicely.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It's also worth noting that there's no hard requirement to
>>>> add
>>>>>>>>>> sugar
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> broadcasting to all topic partitions since the API already
>>>>>>>> provides
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> number of topic partitions available when calling a stream
>>>>>>>>>>>>> partitioner.
>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>> we can't find a clean way to add this support, it might be
>>>>>>>> better
>>>>>>>>>> to
>>>>>>>>>>>>>>>> leave
>>>>>>>>>>>>>>>>> it out and just let people implement this themselves with a
>>>>>>>> line of
>>>>>>>>>>>>>> Java
>>>>>>>>>>>>>>>> 8
>>>>>>>>>>>>>>>>> streams:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>            return IntStream.range(0,
>>>>>>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Also worth noting that there may be a bug in the default
>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
>>>> doesn't
>>>>>>>>>>>>> appear
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> correctly pick up on null return values from the partition
>>>>>>>> method
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> translate them into empty lists.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
>>>>>>>> cadonna@apache.org>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I do not see an issue with using an empty list together
>> with
>>>>>> an
>>>>>>>>>>>>> empty
>>>>>>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty list
>>>>>>>> would
>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>> say that there is no partition to which the record should
>> be
>>>>>>>> sent.
>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>> there is no partition, the record is dropped.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to all
>>>>>>>>>>>>> partitions.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
>>>> return
>>>>>>>> an
>>>>>>>>>>>>>> object
>>>>>>>>>>>>>>>>>> with an enum and a list of partitions. The enum could have
>>>>>>>> values
>>>>>>>>>>>>>> SOME,
>>>>>>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of
>> partitions
>>>>>>>>>>>>> contains
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> partitions to which to send the record. If the value of
>> the
>>>>>>>> enum
>>>>>>>>>> is
>>>>>>>>>>>>>> ALL
>>>>>>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be
>>>> even
>>>>>>>> null
>>>>>>>>>>>>>>>>>> (since in that case the list should not be used and it
>> would
>>>>>>>> be a
>>>>>>>>>>>>> bug
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> use it).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>>>>>>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I agree using null does seem error prone. However I think
>>>>>>>> using a
>>>>>>>>>>>>>>>>>> singleton
>>>>>>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I am
>>>>>>>> saying
>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> because the KIP also has a provision to return an empty
>>>> list
>>>>>>>> to
>>>>>>>>>>>>> refer
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty
>>>> list
>>>>>>>> have
>>>>>>>>>>>>>>>> totally
>>>>>>>>>>>>>>>>>>> different meanings which could get confusing.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Let me know what you think.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de
>> Detrich
>>>>>>>>>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I also concur with this, having an Optional in the type
>>>>>>>> makes it
>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence of
>>>>>>>> value
>>>>>>>>>>>>> (or
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> case the broadcast value).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>> Matthew de Detrich
>>>>>>>>>>>>>>>>>>>> Aiven Deutschland GmbH
>>>>>>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>>>>>>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>>>>>>>>>>>>>>>>>>> m: +491603708037
>>>>>>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>>>>>>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>>>>>>> I would prefer changing the return type of partitions()
>>>> to
>>>>>>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as
>> the
>>>>>>>>>>>>> broadcast
>>>>>>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation returns
>>>>>> null
>>>>>>>> due
>>>>>>>>>>>>> to
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> bug
>>>>>>>>>>>>>>>>>>>>> is much higher than that an implementation returns an
>>>>>> empty
>>>>>>>>>>>>>> Optional
>>>>>>>>>>>>>>>>>> due
>>>>>>>>>>>>>>>>>>>>> to a bug.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Thanks Matthias,

Well, as things stand, we did have internal discussions on this and it
seemed ok to open it up for IQ and more importantly not ok to have it
opened up for FK-Join. And more importantly, the PR for this is already
merged and some of these things came up during that. Here's the PR link:
https://github.com/apache/kafka/pull/12803.

Thanks!
Sagar.


On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax <mj...@apache.org> wrote:

> Ah. Missed it as it does not have a nice "code block" similar to
> `StreamPartitioner` changes.
>
> I understand the motivation, but I am wondering if we might head into a
> tricky direction? State stores (at least the built-in ones) and IQ are
> kinda build with the idea to have sharded data and that a multi-cast of
> keys is an anti-pattern?
>
> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
> sure that generalizing the concepts does not cause issues in the future?
>
> Ie, should we claim that the multi-cast feature should be used for
> KStreams only, but not for KTables?
>
> Just want to double check that we are not doing something we regret later.
>
>
> -Matthias
>
>
> On 12/7/22 6:45 PM, Sagar wrote:
> > Hi Mathias,
> >
> > I did save it. The changes are added under Public Interfaces (Pt#2 about
> > enhancing KeyQueryMetadata with partitions method) and
> > throwing IllegalArgumentException when StreamPartitioner#partitions
> method
> > returns multiple partitions for just FK-join instead of the earlier
> decided
> > FK-Join and IQ.
> >
> > The background is that for IQ, if the users have multi casted records to
> > multiple partitions during ingestion but the fetch returns only a single
> > partition, then it would be wrong. That's why the restriction was lifted
> > for IQ and that's the reason KeyQueryMetadata now has another
> partitions()
> > method to signify the same.
> >
> > FK-Join also has a similar case, but while reviewing it was felt that
> > FK-Join on it's own is fairly complicated and we don't need this feature
> > right away so the restriction still exists.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> I don't see any update on the wiki about it. Did you forget to hit
> "save"?
> >>
> >> Can you also provide some background? I am not sure right now if I
> >> understand the proposed changes?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> >>> Thanks Sagar, this makes sense to me -- we clearly need additional
> >> changes
> >>> to
> >>> avoid breaking IQ when using this feature, but I agree with continuing
> to
> >>> restrict
> >>> FKJ since they wouldn't stop working without it, and would become much
> >>> harder
> >>> to reason about (than they already are) if we did enable them to use
> it.
> >>>
> >>> And of course, they can still multicast the final results of a FKJ,
> they
> >>> just can't
> >>> mess with the internal workings of it in this way.
> >>>
> >>> On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com>
> wrote:
> >>>
> >>>> Hi All,
> >>>>
> >>>> I made a couple of edits to the KIP which came up during the code
> >> review.
> >>>> Changes at a high level are:
> >>>>
> >>>> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >>>> 2) Lifting the restriction of a single partition for IQ. Now the
> >>>> restriction holds only for FK Join.
> >>>>
> >>>> Updated KIP:
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>>>
> >>>> Thanks!
> >>>> Sagar.
> >>>>
> >>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Thanks Bruno,
> >>>>>
> >>>>> Marking this as accepted.
> >>>>>
> >>>>> Thanks everyone for their comments/feedback.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> Thanks for the update and the PR!
> >>>>>>
> >>>>>> +1 (binding)
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 10.09.22 18:57, Sagar wrote:
> >>>>>>> Hi Bruno,
> >>>>>>>
> >>>>>>> Thanks, I think these changes make sense to me. I have updated the
> >> KIP
> >>>>>>> accordingly.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Sagar,
> >>>>>>>>
> >>>>>>>> I would not drop the support for dropping records. I would also
> not
> >>>>>>>> return null from partitions(). Maybe an Optional can help here. An
> >>>>>> empty
> >>>>>>>> Optional would mean to use the default partitioning behavior of
> the
> >>>>>>>> producer. So we would have:
> >>>>>>>>
> >>>>>>>> - non-empty Optional, non-empty list of integers: partitions to
> send
> >>>>>> the
> >>>>>>>> record to
> >>>>>>>> - non-empty Optional, empty list of integers: drop the record
> >>>>>>>> - empty Optional: use default behavior
> >>>>>>>>
> >>>>>>>> What do other think?
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>> On 02.09.22 13:53, Sagar wrote:
> >>>>>>>>> Hello Bruno/Chris,
> >>>>>>>>>
> >>>>>>>>> Since these are the last set of changes(I am assuming haha), it
> >>>> would
> >>>>>> be
> >>>>>>>>> great if you could review the 2 options from above so that we can
> >>>>>> close
> >>>>>>>> the
> >>>>>>>>> voting. Of course I am happy to incorporate any other requisite
> >>>>>> changes.
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> Sagar.
> >>>>>>>>>
> >>>>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <
> sagarmeansocean@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks Bruno for the great points.
> >>>>>>>>>>
> >>>>>>>>>> I see 2 options here =>
> >>>>>>>>>>
> >>>>>>>>>> 1) As Chris suggested, drop the support for dropping records in
> >> the
> >>>>>>>>>> partitioner. That way, an empty list could signify the usage of
> a
> >>>>>>>> default
> >>>>>>>>>> partitioner. Also, if the deprecated partition() method returns
> >>>> null
> >>>>>>>>>> thereby signifying the default partitioner, the partitions() can
> >>>>>> return
> >>>>>>>> an
> >>>>>>>>>> empty list i.e default partitioner.
> >>>>>>>>>>
> >>>>>>>>>> 2) OR we treat a null return type of partitions() method to
> >> signify
> >>>>>> the
> >>>>>>>>>> usage of the default partitioner. In the default implementation
> of
> >>>>>>>>>> partitions() method, if partition() returns null, then even
> >>>>>> partitions()
> >>>>>>>>>> can return null(instead of an empty list). The
> RecordCollectorImpl
> >>>>>> code
> >>>>>>>> can
> >>>>>>>>>> also be modified accordingly. @Chris, to your point, we can even
> >>>> drop
> >>>>>>>> the
> >>>>>>>>>> support of dropping of records. It came up during KIP
> discussion,
> >>>>>> and I
> >>>>>>>>>> thought it might be a useful feature. Let me know what you
> think.
> >>>>>>>>>>
> >>>>>>>>>> 3) Lastly about the partition number check. I wanted to avoid
> the
> >>>>>>>> throwing
> >>>>>>>>>> of exception so I thought adding it might be a useful feature.
> But
> >>>> as
> >>>>>>>> you
> >>>>>>>>>> pointed out, if it can break backwards compatibility, it's
> better
> >>>> to
> >>>>>>>> remove
> >>>>>>>>>> it.
> >>>>>>>>>>
> >>>>>>>>>> Thanks!
> >>>>>>>>>> Sagar.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
> >>>>>> <ch...@aiven.io.invalid>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
> >>>> actually
> >>>>>>>> need
> >>>>>>>>>>> support for dropping records in the partitioner? It doesn't
> seem
> >>>>>>>> necessary
> >>>>>>>>>>> based on the motivation for the KIP. If we remove that feature,
> >> we
> >>>>>>>> could
> >>>>>>>>>>> handle null and/or empty lists by using the default
> partitioning,
> >>>>>>>>>>> equivalent to how we handle null return values from the
> existing
> >>>>>>>> partition
> >>>>>>>>>>> method today.
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
> >> cadonna@apache.org
> >>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Sagar,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you for the updates!
> >>>>>>>>>>>>
> >>>>>>>>>>>> I do not intend to prolong this vote thread more than needed,
> >>>> but I
> >>>>>>>>>>>> still have some points.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The deprecated partition method can return null if the default
> >>>>>>>>>>>> partitioning logic of the producer should be used.
> >>>>>>>>>>>> With the new method partitions() it seems that it is not
> >> possible
> >>>>>> to
> >>>>>>>> use
> >>>>>>>>>>>> the default partitioning logic, anymore.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also, in the default implementation of method partitions(), a
> >>>>>> record
> >>>>>>>>>>>> that would use the default partitioning logic in method
> >>>> partition()
> >>>>>>>>>>>> would be dropped, which would break backward compatibility
> since
> >>>>>>>> Streams
> >>>>>>>>>>>> would always call the new method partitions() even though the
> >>>> users
> >>>>>>>>>>>> still implement the deprecated method partition().
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have a last point that we should probably discuss on the PR
> >> and
> >>>>>> not
> >>>>>>>> on
> >>>>>>>>>>>> the KIP but since you added the code in the KIP I need to
> >> mention
> >>>>>> it.
> >>>>>>>> I
> >>>>>>>>>>>> do not think you should check the validity of the partition
> >>>> number
> >>>>>>>> since
> >>>>>>>>>>>> the ProducerRecord does the same check and throws an
> exception.
> >>>> If
> >>>>>>>>>>>> Streams adds the same check but does not throw, the behavior
> is
> >>>> not
> >>>>>>>>>>>> backward compatible.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Bruno
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
> >>>>>>>>>>>>> Thanks Bruno/Chris,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Even I agree that might be better to keep it simple like the
> >> way
> >>>>>>>> Chris
> >>>>>>>>>>>>> suggested. I have updated the KIP accordingly. I made couple
> of
> >>>>>> minor
> >>>>>>>>>>>>> changes to the KIP:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) One of them being the change of return type of partitions
> >>>>>> method
> >>>>>>>>>>> from
> >>>>>>>>>>>>> List to Set. This is to ensure that in case the
> implementation
> >>>> of
> >>>>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
> >>>>>>>>>>>>> partition numbers, we won't have duplicates thereby not
> trying
> >>>> to
> >>>>>>>>>>> send to
> >>>>>>>>>>>>> the same partition multiple times due to this.
> >>>>>>>>>>>>> 2) I also added a check to send the record only to valid
> >>>> partition
> >>>>>>>>>>>> numbers
> >>>>>>>>>>>>> and log and drop when the partition number is invalid. This
> is
> >>>>>> again
> >>>>>>>>>>> to
> >>>>>>>>>>>>> prevent errors for cases when the StreamPartitioner
> >>>> implementation
> >>>>>>>> has
> >>>>>>>>>>>> some
> >>>>>>>>>>>>> bugs (since there are no validations as such).
> >>>>>>>>>>>>> 3) I also updated the Test Plan section based on the
> suggestion
> >>>>>> from
> >>>>>>>>>>>> Bruno.
> >>>>>>>>>>>>> 4) I updated the default implementation of partitions method
> >>>>>> based on
> >>>>>>>>>>> the
> >>>>>>>>>>>>> great catch from Chris!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Let me know if it looks fine now.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
> >>>> cadonna@apache.org
> >>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and
> leave
> >>>>>> the
> >>>>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I
> think
> >>>>>> that
> >>>>>>>> is
> >>>>>>>>>>>>>> the cleanest option.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
> >>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I think it'd be useful to be more explicit about
> broadcasting
> >>>> to
> >>>>>>>> all
> >>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>> partitions rather than add implicit behavior for empty
> cases
> >>>>>> (empty
> >>>>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
> >> would
> >>>>>>>>>>> address
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> nicely.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> It's also worth noting that there's no hard requirement to
> >> add
> >>>>>>>> sugar
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>> broadcasting to all topic partitions since the API already
> >>>>>> provides
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> number of topic partitions available when calling a stream
> >>>>>>>>>>> partitioner.
> >>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>> we can't find a clean way to add this support, it might be
> >>>>>> better
> >>>>>>>> to
> >>>>>>>>>>>>>> leave
> >>>>>>>>>>>>>>> it out and just let people implement this themselves with a
> >>>>>> line of
> >>>>>>>>>>>> Java
> >>>>>>>>>>>>>> 8
> >>>>>>>>>>>>>>> streams:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>           return IntStream.range(0,
> >>>>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Also worth noting that there may be a bug in the default
> >>>>>>>>>>> implementation
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
> >> doesn't
> >>>>>>>>>>> appear
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> correctly pick up on null return values from the partition
> >>>>>> method
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>> translate them into empty lists.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
> >>>>>> cadonna@apache.org>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Sagar,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I do not see an issue with using an empty list together
> with
> >>>> an
> >>>>>>>>>>> empty
> >>>>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty list
> >>>>>> would
> >>>>>>>>>>> just
> >>>>>>>>>>>>>>>> say that there is no partition to which the record should
> be
> >>>>>> sent.
> >>>>>>>>>>> If
> >>>>>>>>>>>>>>>> there is no partition, the record is dropped.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to all
> >>>>>>>>>>> partitions.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
> >> return
> >>>>>> an
> >>>>>>>>>>>> object
> >>>>>>>>>>>>>>>> with an enum and a list of partitions. The enum could have
> >>>>>> values
> >>>>>>>>>>>> SOME,
> >>>>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of
> partitions
> >>>>>>>>>>> contains
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> partitions to which to send the record. If the value of
> the
> >>>>>> enum
> >>>>>>>> is
> >>>>>>>>>>>> ALL
> >>>>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be
> >> even
> >>>>>> null
> >>>>>>>>>>>>>>>> (since in that case the list should not be used and it
> would
> >>>>>> be a
> >>>>>>>>>>> bug
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> use it).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
> >>>>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I agree using null does seem error prone. However I think
> >>>>>> using a
> >>>>>>>>>>>>>>>> singleton
> >>>>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I am
> >>>>>> saying
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> because the KIP also has a provision to return an empty
> >> list
> >>>>>> to
> >>>>>>>>>>> refer
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty
> >> list
> >>>>>> have
> >>>>>>>>>>>>>> totally
> >>>>>>>>>>>>>>>>> different meanings which could get confusing.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de
> Detrich
> >>>>>>>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I also concur with this, having an Optional in the type
> >>>>>> makes it
> >>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence of
> >>>>>> value
> >>>>>>>>>>> (or
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> case the broadcast value).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Matthew de Detrich
> >>>>>>>>>>>>>>>>>> Aiven Deutschland GmbH
> >>>>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>>>>>>>>>>>>>>>> m: +491603708037
> >>>>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
> >>>>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
> >> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>> I would prefer changing the return type of partitions()
> >> to
> >>>>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as
> the
> >>>>>>>>>>> broadcast
> >>>>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation returns
> >>>> null
> >>>>>> due
> >>>>>>>>>>> to
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> bug
> >>>>>>>>>>>>>>>>>>> is much higher than that an implementation returns an
> >>>> empty
> >>>>>>>>>>>> Optional
> >>>>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>>>> to a bug.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by "Matthias J. Sax" <mj...@apache.org>.
Ah. Missed it as it does not have a nice "code block" similar to 
`StreamPartitioner` changes.

I understand the motivation, but I am wondering if we might head into a 
tricky direction? State stores (at least the built-in ones) and IQ are 
kinda build with the idea to have sharded data and that a multi-cast of 
keys is an anti-pattern?

Maybe it's fine, but I also don't want to open Pandora's Box. Are we 
sure that generalizing the concepts does not cause issues in the future?

Ie, should we claim that the multi-cast feature should be used for 
KStreams only, but not for KTables?

Just want to double check that we are not doing something we regret later.


-Matthias


On 12/7/22 6:45 PM, Sagar wrote:
> Hi Mathias,
> 
> I did save it. The changes are added under Public Interfaces (Pt#2 about
> enhancing KeyQueryMetadata with partitions method) and
> throwing IllegalArgumentException when StreamPartitioner#partitions method
> returns multiple partitions for just FK-join instead of the earlier decided
> FK-Join and IQ.
> 
> The background is that for IQ, if the users have multi casted records to
> multiple partitions during ingestion but the fetch returns only a single
> partition, then it would be wrong. That's why the restriction was lifted
> for IQ and that's the reason KeyQueryMetadata now has another partitions()
> method to signify the same.
> 
> FK-Join also has a similar case, but while reviewing it was felt that
> FK-Join on it's own is fairly complicated and we don't need this feature
> right away so the restriction still exists.
> 
> Thanks!
> Sagar.
> 
> 
> On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> I don't see any update on the wiki about it. Did you forget to hit "save"?
>>
>> Can you also provide some background? I am not sure right now if I
>> understand the proposed changes?
>>
>>
>> -Matthias
>>
>> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
>>> Thanks Sagar, this makes sense to me -- we clearly need additional
>> changes
>>> to
>>> avoid breaking IQ when using this feature, but I agree with continuing to
>>> restrict
>>> FKJ since they wouldn't stop working without it, and would become much
>>> harder
>>> to reason about (than they already are) if we did enable them to use it.
>>>
>>> And of course, they can still multicast the final results of a FKJ, they
>>> just can't
>>> mess with the internal workings of it in this way.
>>>
>>> On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I made a couple of edits to the KIP which came up during the code
>> review.
>>>> Changes at a high level are:
>>>>
>>>> 1) KeyQueryMetada enhanced to have a new method called partitions().
>>>> 2) Lifting the restriction of a single partition for IQ. Now the
>>>> restriction holds only for FK Join.
>>>>
>>>> Updated KIP:
>>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
>>>>
>>>> Thanks!
>>>> Sagar.
>>>>
>>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com>
>> wrote:
>>>>
>>>>> Thanks Bruno,
>>>>>
>>>>> Marking this as accepted.
>>>>>
>>>>> Thanks everyone for their comments/feedback.
>>>>>
>>>>> Thanks!
>>>>> Sagar.
>>>>>
>>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org>
>>>> wrote:
>>>>>
>>>>>> Hi Sagar,
>>>>>>
>>>>>> Thanks for the update and the PR!
>>>>>>
>>>>>> +1 (binding)
>>>>>>
>>>>>> Best,
>>>>>> Bruno
>>>>>>
>>>>>> On 10.09.22 18:57, Sagar wrote:
>>>>>>> Hi Bruno,
>>>>>>>
>>>>>>> Thanks, I think these changes make sense to me. I have updated the
>> KIP
>>>>>>> accordingly.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Sagar.
>>>>>>>
>>>>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Sagar,
>>>>>>>>
>>>>>>>> I would not drop the support for dropping records. I would also not
>>>>>>>> return null from partitions(). Maybe an Optional can help here. An
>>>>>> empty
>>>>>>>> Optional would mean to use the default partitioning behavior of the
>>>>>>>> producer. So we would have:
>>>>>>>>
>>>>>>>> - non-empty Optional, non-empty list of integers: partitions to send
>>>>>> the
>>>>>>>> record to
>>>>>>>> - non-empty Optional, empty list of integers: drop the record
>>>>>>>> - empty Optional: use default behavior
>>>>>>>>
>>>>>>>> What do other think?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Bruno
>>>>>>>>
>>>>>>>> On 02.09.22 13:53, Sagar wrote:
>>>>>>>>> Hello Bruno/Chris,
>>>>>>>>>
>>>>>>>>> Since these are the last set of changes(I am assuming haha), it
>>>> would
>>>>>> be
>>>>>>>>> great if you could review the 2 options from above so that we can
>>>>>> close
>>>>>>>> the
>>>>>>>>> voting. Of course I am happy to incorporate any other requisite
>>>>>> changes.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Sagar.
>>>>>>>>>
>>>>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Bruno for the great points.
>>>>>>>>>>
>>>>>>>>>> I see 2 options here =>
>>>>>>>>>>
>>>>>>>>>> 1) As Chris suggested, drop the support for dropping records in
>> the
>>>>>>>>>> partitioner. That way, an empty list could signify the usage of a
>>>>>>>> default
>>>>>>>>>> partitioner. Also, if the deprecated partition() method returns
>>>> null
>>>>>>>>>> thereby signifying the default partitioner, the partitions() can
>>>>>> return
>>>>>>>> an
>>>>>>>>>> empty list i.e default partitioner.
>>>>>>>>>>
>>>>>>>>>> 2) OR we treat a null return type of partitions() method to
>> signify
>>>>>> the
>>>>>>>>>> usage of the default partitioner. In the default implementation of
>>>>>>>>>> partitions() method, if partition() returns null, then even
>>>>>> partitions()
>>>>>>>>>> can return null(instead of an empty list). The RecordCollectorImpl
>>>>>> code
>>>>>>>> can
>>>>>>>>>> also be modified accordingly. @Chris, to your point, we can even
>>>> drop
>>>>>>>> the
>>>>>>>>>> support of dropping of records. It came up during KIP discussion,
>>>>>> and I
>>>>>>>>>> thought it might be a useful feature. Let me know what you think.
>>>>>>>>>>
>>>>>>>>>> 3) Lastly about the partition number check. I wanted to avoid the
>>>>>>>> throwing
>>>>>>>>>> of exception so I thought adding it might be a useful feature. But
>>>> as
>>>>>>>> you
>>>>>>>>>> pointed out, if it can break backwards compatibility, it's better
>>>> to
>>>>>>>> remove
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>> Sagar.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>>>>>> <ch...@aiven.io.invalid>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
>>>> actually
>>>>>>>> need
>>>>>>>>>>> support for dropping records in the partitioner? It doesn't seem
>>>>>>>> necessary
>>>>>>>>>>> based on the motivation for the KIP. If we remove that feature,
>> we
>>>>>>>> could
>>>>>>>>>>> handle null and/or empty lists by using the default partitioning,
>>>>>>>>>>> equivalent to how we handle null return values from the existing
>>>>>>>> partition
>>>>>>>>>>> method today.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
>> cadonna@apache.org
>>>>>
>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for the updates!
>>>>>>>>>>>>
>>>>>>>>>>>> I do not intend to prolong this vote thread more than needed,
>>>> but I
>>>>>>>>>>>> still have some points.
>>>>>>>>>>>>
>>>>>>>>>>>> The deprecated partition method can return null if the default
>>>>>>>>>>>> partitioning logic of the producer should be used.
>>>>>>>>>>>> With the new method partitions() it seems that it is not
>> possible
>>>>>> to
>>>>>>>> use
>>>>>>>>>>>> the default partitioning logic, anymore.
>>>>>>>>>>>>
>>>>>>>>>>>> Also, in the default implementation of method partitions(), a
>>>>>> record
>>>>>>>>>>>> that would use the default partitioning logic in method
>>>> partition()
>>>>>>>>>>>> would be dropped, which would break backward compatibility since
>>>>>>>> Streams
>>>>>>>>>>>> would always call the new method partitions() even though the
>>>> users
>>>>>>>>>>>> still implement the deprecated method partition().
>>>>>>>>>>>>
>>>>>>>>>>>> I have a last point that we should probably discuss on the PR
>> and
>>>>>> not
>>>>>>>> on
>>>>>>>>>>>> the KIP but since you added the code in the KIP I need to
>> mention
>>>>>> it.
>>>>>>>> I
>>>>>>>>>>>> do not think you should check the validity of the partition
>>>> number
>>>>>>>> since
>>>>>>>>>>>> the ProducerRecord does the same check and throws an exception.
>>>> If
>>>>>>>>>>>> Streams adds the same check but does not throw, the behavior is
>>>> not
>>>>>>>>>>>> backward compatible.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Bruno
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
>>>>>>>>>>>>> Thanks Bruno/Chris,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Even I agree that might be better to keep it simple like the
>> way
>>>>>>>> Chris
>>>>>>>>>>>>> suggested. I have updated the KIP accordingly. I made couple of
>>>>>> minor
>>>>>>>>>>>>> changes to the KIP:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1) One of them being the change of return type of partitions
>>>>>> method
>>>>>>>>>>> from
>>>>>>>>>>>>> List to Set. This is to ensure that in case the implementation
>>>> of
>>>>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
>>>>>>>>>>>>> partition numbers, we won't have duplicates thereby not trying
>>>> to
>>>>>>>>>>> send to
>>>>>>>>>>>>> the same partition multiple times due to this.
>>>>>>>>>>>>> 2) I also added a check to send the record only to valid
>>>> partition
>>>>>>>>>>>> numbers
>>>>>>>>>>>>> and log and drop when the partition number is invalid. This is
>>>>>> again
>>>>>>>>>>> to
>>>>>>>>>>>>> prevent errors for cases when the StreamPartitioner
>>>> implementation
>>>>>>>> has
>>>>>>>>>>>> some
>>>>>>>>>>>>> bugs (since there are no validations as such).
>>>>>>>>>>>>> 3) I also updated the Test Plan section based on the suggestion
>>>>>> from
>>>>>>>>>>>> Bruno.
>>>>>>>>>>>>> 4) I updated the default implementation of partitions method
>>>>>> based on
>>>>>>>>>>> the
>>>>>>>>>>>>> great catch from Chris!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let me know if it looks fine now.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
>>>> cadonna@apache.org
>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and leave
>>>>>> the
>>>>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I think
>>>>>> that
>>>>>>>> is
>>>>>>>>>>>>>> the cleanest option.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think it'd be useful to be more explicit about broadcasting
>>>> to
>>>>>>>> all
>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>> partitions rather than add implicit behavior for empty cases
>>>>>> (empty
>>>>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
>> would
>>>>>>>>>>> address
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> nicely.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It's also worth noting that there's no hard requirement to
>> add
>>>>>>>> sugar
>>>>>>>>>>>> for
>>>>>>>>>>>>>>> broadcasting to all topic partitions since the API already
>>>>>> provides
>>>>>>>>>>> the
>>>>>>>>>>>>>>> number of topic partitions available when calling a stream
>>>>>>>>>>> partitioner.
>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>> we can't find a clean way to add this support, it might be
>>>>>> better
>>>>>>>> to
>>>>>>>>>>>>>> leave
>>>>>>>>>>>>>>> it out and just let people implement this themselves with a
>>>>>> line of
>>>>>>>>>>>> Java
>>>>>>>>>>>>>> 8
>>>>>>>>>>>>>>> streams:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>           return IntStream.range(0,
>>>>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also worth noting that there may be a bug in the default
>>>>>>>>>>> implementation
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
>> doesn't
>>>>>>>>>>> appear
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> correctly pick up on null return values from the partition
>>>>>> method
>>>>>>>>>>> and
>>>>>>>>>>>>>>> translate them into empty lists.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
>>>>>> cadonna@apache.org>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I do not see an issue with using an empty list together with
>>>> an
>>>>>>>>>>> empty
>>>>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty list
>>>>>> would
>>>>>>>>>>> just
>>>>>>>>>>>>>>>> say that there is no partition to which the record should be
>>>>>> sent.
>>>>>>>>>>> If
>>>>>>>>>>>>>>>> there is no partition, the record is dropped.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to all
>>>>>>>>>>> partitions.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
>> return
>>>>>> an
>>>>>>>>>>>> object
>>>>>>>>>>>>>>>> with an enum and a list of partitions. The enum could have
>>>>>> values
>>>>>>>>>>>> SOME,
>>>>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
>>>>>>>>>>> contains
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> partitions to which to send the record. If the value of the
>>>>>> enum
>>>>>>>> is
>>>>>>>>>>>> ALL
>>>>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be
>> even
>>>>>> null
>>>>>>>>>>>>>>>> (since in that case the list should not be used and it would
>>>>>> be a
>>>>>>>>>>> bug
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> use it).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>>>>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I agree using null does seem error prone. However I think
>>>>>> using a
>>>>>>>>>>>>>>>> singleton
>>>>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I am
>>>>>> saying
>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> because the KIP also has a provision to return an empty
>> list
>>>>>> to
>>>>>>>>>>> refer
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty
>> list
>>>>>> have
>>>>>>>>>>>>>> totally
>>>>>>>>>>>>>>>>> different meanings which could get confusing.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Let me know what you think.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>>>>>>>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I also concur with this, having an Optional in the type
>>>>>> makes it
>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence of
>>>>>> value
>>>>>>>>>>> (or
>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>> case the broadcast value).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Matthew de Detrich
>>>>>>>>>>>>>>>>>> Aiven Deutschland GmbH
>>>>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>>>>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>>>>>>>>>>>>>>>>> m: +491603708037
>>>>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>>>>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>>>>> I would prefer changing the return type of partitions()
>> to
>>>>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
>>>>>>>>>>> broadcast
>>>>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation returns
>>>> null
>>>>>> due
>>>>>>>>>>> to
>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> bug
>>>>>>>>>>>>>>>>>>> is much higher than that an implementation returns an
>>>> empty
>>>>>>>>>>>> Optional
>>>>>>>>>>>>>>>> due
>>>>>>>>>>>>>>>>>>> to a bug.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Hi Mathias,

I did save it. The changes are added under Public Interfaces (Pt#2 about
enhancing KeyQueryMetadata with partitions method) and
throwing IllegalArgumentException when StreamPartitioner#partitions method
returns multiple partitions for just FK-join instead of the earlier decided
FK-Join and IQ.

The background is that for IQ, if the users have multi casted records to
multiple partitions during ingestion but the fetch returns only a single
partition, then it would be wrong. That's why the restriction was lifted
for IQ and that's the reason KeyQueryMetadata now has another partitions()
method to signify the same.

FK-Join also has a similar case, but while reviewing it was felt that
FK-Join on it's own is fairly complicated and we don't need this feature
right away so the restriction still exists.

Thanks!
Sagar.


On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:

> I don't see any update on the wiki about it. Did you forget to hit "save"?
>
> Can you also provide some background? I am not sure right now if I
> understand the proposed changes?
>
>
> -Matthias
>
> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> > Thanks Sagar, this makes sense to me -- we clearly need additional
> changes
> > to
> > avoid breaking IQ when using this feature, but I agree with continuing to
> > restrict
> > FKJ since they wouldn't stop working without it, and would become much
> > harder
> > to reason about (than they already are) if we did enable them to use it.
> >
> > And of course, they can still multicast the final results of a FKJ, they
> > just can't
> > mess with the internal workings of it in this way.
> >
> > On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> I made a couple of edits to the KIP which came up during the code
> review.
> >> Changes at a high level are:
> >>
> >> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >> 2) Lifting the restriction of a single partition for IQ. Now the
> >> restriction holds only for FK Join.
> >>
> >> Updated KIP:
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com>
> wrote:
> >>
> >>> Thanks Bruno,
> >>>
> >>> Marking this as accepted.
> >>>
> >>> Thanks everyone for their comments/feedback.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org>
> >> wrote:
> >>>
> >>>> Hi Sagar,
> >>>>
> >>>> Thanks for the update and the PR!
> >>>>
> >>>> +1 (binding)
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 10.09.22 18:57, Sagar wrote:
> >>>>> Hi Bruno,
> >>>>>
> >>>>> Thanks, I think these changes make sense to me. I have updated the
> KIP
> >>>>> accordingly.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> I would not drop the support for dropping records. I would also not
> >>>>>> return null from partitions(). Maybe an Optional can help here. An
> >>>> empty
> >>>>>> Optional would mean to use the default partitioning behavior of the
> >>>>>> producer. So we would have:
> >>>>>>
> >>>>>> - non-empty Optional, non-empty list of integers: partitions to send
> >>>> the
> >>>>>> record to
> >>>>>> - non-empty Optional, empty list of integers: drop the record
> >>>>>> - empty Optional: use default behavior
> >>>>>>
> >>>>>> What do other think?
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 02.09.22 13:53, Sagar wrote:
> >>>>>>> Hello Bruno/Chris,
> >>>>>>>
> >>>>>>> Since these are the last set of changes(I am assuming haha), it
> >> would
> >>>> be
> >>>>>>> great if you could review the 2 options from above so that we can
> >>>> close
> >>>>>> the
> >>>>>>> voting. Of course I am happy to incorporate any other requisite
> >>>> changes.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks Bruno for the great points.
> >>>>>>>>
> >>>>>>>> I see 2 options here =>
> >>>>>>>>
> >>>>>>>> 1) As Chris suggested, drop the support for dropping records in
> the
> >>>>>>>> partitioner. That way, an empty list could signify the usage of a
> >>>>>> default
> >>>>>>>> partitioner. Also, if the deprecated partition() method returns
> >> null
> >>>>>>>> thereby signifying the default partitioner, the partitions() can
> >>>> return
> >>>>>> an
> >>>>>>>> empty list i.e default partitioner.
> >>>>>>>>
> >>>>>>>> 2) OR we treat a null return type of partitions() method to
> signify
> >>>> the
> >>>>>>>> usage of the default partitioner. In the default implementation of
> >>>>>>>> partitions() method, if partition() returns null, then even
> >>>> partitions()
> >>>>>>>> can return null(instead of an empty list). The RecordCollectorImpl
> >>>> code
> >>>>>> can
> >>>>>>>> also be modified accordingly. @Chris, to your point, we can even
> >> drop
> >>>>>> the
> >>>>>>>> support of dropping of records. It came up during KIP discussion,
> >>>> and I
> >>>>>>>> thought it might be a useful feature. Let me know what you think.
> >>>>>>>>
> >>>>>>>> 3) Lastly about the partition number check. I wanted to avoid the
> >>>>>> throwing
> >>>>>>>> of exception so I thought adding it might be a useful feature. But
> >> as
> >>>>>> you
> >>>>>>>> pointed out, if it can break backwards compatibility, it's better
> >> to
> >>>>>> remove
> >>>>>>>> it.
> >>>>>>>>
> >>>>>>>> Thanks!
> >>>>>>>> Sagar.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
> >>>> <ch...@aiven.io.invalid>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
> >> actually
> >>>>>> need
> >>>>>>>>> support for dropping records in the partitioner? It doesn't seem
> >>>>>> necessary
> >>>>>>>>> based on the motivation for the KIP. If we remove that feature,
> we
> >>>>>> could
> >>>>>>>>> handle null and/or empty lists by using the default partitioning,
> >>>>>>>>> equivalent to how we handle null return values from the existing
> >>>>>> partition
> >>>>>>>>> method today.
> >>>>>>>>>
> >>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
> cadonna@apache.org
> >>>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Sagar,
> >>>>>>>>>>
> >>>>>>>>>> Thank you for the updates!
> >>>>>>>>>>
> >>>>>>>>>> I do not intend to prolong this vote thread more than needed,
> >> but I
> >>>>>>>>>> still have some points.
> >>>>>>>>>>
> >>>>>>>>>> The deprecated partition method can return null if the default
> >>>>>>>>>> partitioning logic of the producer should be used.
> >>>>>>>>>> With the new method partitions() it seems that it is not
> possible
> >>>> to
> >>>>>> use
> >>>>>>>>>> the default partitioning logic, anymore.
> >>>>>>>>>>
> >>>>>>>>>> Also, in the default implementation of method partitions(), a
> >>>> record
> >>>>>>>>>> that would use the default partitioning logic in method
> >> partition()
> >>>>>>>>>> would be dropped, which would break backward compatibility since
> >>>>>> Streams
> >>>>>>>>>> would always call the new method partitions() even though the
> >> users
> >>>>>>>>>> still implement the deprecated method partition().
> >>>>>>>>>>
> >>>>>>>>>> I have a last point that we should probably discuss on the PR
> and
> >>>> not
> >>>>>> on
> >>>>>>>>>> the KIP but since you added the code in the KIP I need to
> mention
> >>>> it.
> >>>>>> I
> >>>>>>>>>> do not think you should check the validity of the partition
> >> number
> >>>>>> since
> >>>>>>>>>> the ProducerRecord does the same check and throws an exception.
> >> If
> >>>>>>>>>> Streams adds the same check but does not throw, the behavior is
> >> not
> >>>>>>>>>> backward compatible.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Bruno
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
> >>>>>>>>>>> Thanks Bruno/Chris,
> >>>>>>>>>>>
> >>>>>>>>>>> Even I agree that might be better to keep it simple like the
> way
> >>>>>> Chris
> >>>>>>>>>>> suggested. I have updated the KIP accordingly. I made couple of
> >>>> minor
> >>>>>>>>>>> changes to the KIP:
> >>>>>>>>>>>
> >>>>>>>>>>> 1) One of them being the change of return type of partitions
> >>>> method
> >>>>>>>>> from
> >>>>>>>>>>> List to Set. This is to ensure that in case the implementation
> >> of
> >>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
> >>>>>>>>>>> partition numbers, we won't have duplicates thereby not trying
> >> to
> >>>>>>>>> send to
> >>>>>>>>>>> the same partition multiple times due to this.
> >>>>>>>>>>> 2) I also added a check to send the record only to valid
> >> partition
> >>>>>>>>>> numbers
> >>>>>>>>>>> and log and drop when the partition number is invalid. This is
> >>>> again
> >>>>>>>>> to
> >>>>>>>>>>> prevent errors for cases when the StreamPartitioner
> >> implementation
> >>>>>> has
> >>>>>>>>>> some
> >>>>>>>>>>> bugs (since there are no validations as such).
> >>>>>>>>>>> 3) I also updated the Test Plan section based on the suggestion
> >>>> from
> >>>>>>>>>> Bruno.
> >>>>>>>>>>> 4) I updated the default implementation of partitions method
> >>>> based on
> >>>>>>>>> the
> >>>>>>>>>>> great catch from Chris!
> >>>>>>>>>>>
> >>>>>>>>>>> Let me know if it looks fine now.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Sagar.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
> >> cadonna@apache.org
> >>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and leave
> >>>> the
> >>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I think
> >>>> that
> >>>>>> is
> >>>>>>>>>>>> the cleanest option.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Bruno
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
> >>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think it'd be useful to be more explicit about broadcasting
> >> to
> >>>>>> all
> >>>>>>>>>>>> topic
> >>>>>>>>>>>>> partitions rather than add implicit behavior for empty cases
> >>>> (empty
> >>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
> would
> >>>>>>>>> address
> >>>>>>>>>>>> that
> >>>>>>>>>>>>> nicely.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It's also worth noting that there's no hard requirement to
> add
> >>>>>> sugar
> >>>>>>>>>> for
> >>>>>>>>>>>>> broadcasting to all topic partitions since the API already
> >>>> provides
> >>>>>>>>> the
> >>>>>>>>>>>>> number of topic partitions available when calling a stream
> >>>>>>>>> partitioner.
> >>>>>>>>>>>> If
> >>>>>>>>>>>>> we can't find a clean way to add this support, it might be
> >>>> better
> >>>>>> to
> >>>>>>>>>>>> leave
> >>>>>>>>>>>>> it out and just let people implement this themselves with a
> >>>> line of
> >>>>>>>>>> Java
> >>>>>>>>>>>> 8
> >>>>>>>>>>>>> streams:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>          return IntStream.range(0,
> >>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Also worth noting that there may be a bug in the default
> >>>>>>>>> implementation
> >>>>>>>>>>>> for
> >>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
> doesn't
> >>>>>>>>> appear
> >>>>>>>>>> to
> >>>>>>>>>>>>> correctly pick up on null return values from the partition
> >>>> method
> >>>>>>>>> and
> >>>>>>>>>>>>> translate them into empty lists.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
> >>>> cadonna@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Sagar,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I do not see an issue with using an empty list together with
> >> an
> >>>>>>>>> empty
> >>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty list
> >>>> would
> >>>>>>>>> just
> >>>>>>>>>>>>>> say that there is no partition to which the record should be
> >>>> sent.
> >>>>>>>>> If
> >>>>>>>>>>>>>> there is no partition, the record is dropped.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to all
> >>>>>>>>> partitions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
> return
> >>>> an
> >>>>>>>>>> object
> >>>>>>>>>>>>>> with an enum and a list of partitions. The enum could have
> >>>> values
> >>>>>>>>>> SOME,
> >>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
> >>>>>>>>> contains
> >>>>>>>>>> the
> >>>>>>>>>>>>>> partitions to which to send the record. If the value of the
> >>>> enum
> >>>>>> is
> >>>>>>>>>> ALL
> >>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be
> even
> >>>> null
> >>>>>>>>>>>>>> (since in that case the list should not be used and it would
> >>>> be a
> >>>>>>>>> bug
> >>>>>>>>>> to
> >>>>>>>>>>>>>> use it).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
> >>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I agree using null does seem error prone. However I think
> >>>> using a
> >>>>>>>>>>>>>> singleton
> >>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I am
> >>>> saying
> >>>>>>>>> this
> >>>>>>>>>>>>>>> because the KIP also has a provision to return an empty
> list
> >>>> to
> >>>>>>>>> refer
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty
> list
> >>>> have
> >>>>>>>>>>>> totally
> >>>>>>>>>>>>>>> different meanings which could get confusing.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> >>>>>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I also concur with this, having an Optional in the type
> >>>> makes it
> >>>>>>>>>> very
> >>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence of
> >>>> value
> >>>>>>>>> (or
> >>>>>>>>>> in
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>> case the broadcast value).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> Matthew de Detrich
> >>>>>>>>>>>>>>>> Aiven Deutschland GmbH
> >>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>>>>>>>>>>>>>> m: +491603708037
> >>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
> >>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>> I would prefer changing the return type of partitions()
> to
> >>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
> >>>>>>>>> broadcast
> >>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation returns
> >> null
> >>>> due
> >>>>>>>>> to
> >>>>>>>>>> a
> >>>>>>>>>>>>>> bug
> >>>>>>>>>>>>>>>>> is much higher than that an implementation returns an
> >> empty
> >>>>>>>>>> Optional
> >>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>> to a bug.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by "Matthias J. Sax" <mj...@apache.org>.
I don't see any update on the wiki about it. Did you forget to hit "save"?

Can you also provide some background? I am not sure right now if I 
understand the proposed changes?


-Matthias

On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> Thanks Sagar, this makes sense to me -- we clearly need additional changes
> to
> avoid breaking IQ when using this feature, but I agree with continuing to
> restrict
> FKJ since they wouldn't stop working without it, and would become much
> harder
> to reason about (than they already are) if we did enable them to use it.
> 
> And of course, they can still multicast the final results of a FKJ, they
> just can't
> mess with the internal workings of it in this way.
> 
> On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com> wrote:
> 
>> Hi All,
>>
>> I made a couple of edits to the KIP which came up during the code review.
>> Changes at a high level are:
>>
>> 1) KeyQueryMetada enhanced to have a new method called partitions().
>> 2) Lifting the restriction of a single partition for IQ. Now the
>> restriction holds only for FK Join.
>>
>> Updated KIP:
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
>>
>> Thanks!
>> Sagar.
>>
>> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com> wrote:
>>
>>> Thanks Bruno,
>>>
>>> Marking this as accepted.
>>>
>>> Thanks everyone for their comments/feedback.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org>
>> wrote:
>>>
>>>> Hi Sagar,
>>>>
>>>> Thanks for the update and the PR!
>>>>
>>>> +1 (binding)
>>>>
>>>> Best,
>>>> Bruno
>>>>
>>>> On 10.09.22 18:57, Sagar wrote:
>>>>> Hi Bruno,
>>>>>
>>>>> Thanks, I think these changes make sense to me. I have updated the KIP
>>>>> accordingly.
>>>>>
>>>>> Thanks!
>>>>> Sagar.
>>>>>
>>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org>
>>>> wrote:
>>>>>
>>>>>> Hi Sagar,
>>>>>>
>>>>>> I would not drop the support for dropping records. I would also not
>>>>>> return null from partitions(). Maybe an Optional can help here. An
>>>> empty
>>>>>> Optional would mean to use the default partitioning behavior of the
>>>>>> producer. So we would have:
>>>>>>
>>>>>> - non-empty Optional, non-empty list of integers: partitions to send
>>>> the
>>>>>> record to
>>>>>> - non-empty Optional, empty list of integers: drop the record
>>>>>> - empty Optional: use default behavior
>>>>>>
>>>>>> What do other think?
>>>>>>
>>>>>> Best,
>>>>>> Bruno
>>>>>>
>>>>>> On 02.09.22 13:53, Sagar wrote:
>>>>>>> Hello Bruno/Chris,
>>>>>>>
>>>>>>> Since these are the last set of changes(I am assuming haha), it
>> would
>>>> be
>>>>>>> great if you could review the 2 options from above so that we can
>>>> close
>>>>>> the
>>>>>>> voting. Of course I am happy to incorporate any other requisite
>>>> changes.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Sagar.
>>>>>>>
>>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Bruno for the great points.
>>>>>>>>
>>>>>>>> I see 2 options here =>
>>>>>>>>
>>>>>>>> 1) As Chris suggested, drop the support for dropping records in the
>>>>>>>> partitioner. That way, an empty list could signify the usage of a
>>>>>> default
>>>>>>>> partitioner. Also, if the deprecated partition() method returns
>> null
>>>>>>>> thereby signifying the default partitioner, the partitions() can
>>>> return
>>>>>> an
>>>>>>>> empty list i.e default partitioner.
>>>>>>>>
>>>>>>>> 2) OR we treat a null return type of partitions() method to signify
>>>> the
>>>>>>>> usage of the default partitioner. In the default implementation of
>>>>>>>> partitions() method, if partition() returns null, then even
>>>> partitions()
>>>>>>>> can return null(instead of an empty list). The RecordCollectorImpl
>>>> code
>>>>>> can
>>>>>>>> also be modified accordingly. @Chris, to your point, we can even
>> drop
>>>>>> the
>>>>>>>> support of dropping of records. It came up during KIP discussion,
>>>> and I
>>>>>>>> thought it might be a useful feature. Let me know what you think.
>>>>>>>>
>>>>>>>> 3) Lastly about the partition number check. I wanted to avoid the
>>>>>> throwing
>>>>>>>> of exception so I thought adding it might be a useful feature. But
>> as
>>>>>> you
>>>>>>>> pointed out, if it can break backwards compatibility, it's better
>> to
>>>>>> remove
>>>>>>>> it.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Sagar.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>>>> <ch...@aiven.io.invalid>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
>> actually
>>>>>> need
>>>>>>>>> support for dropping records in the partitioner? It doesn't seem
>>>>>> necessary
>>>>>>>>> based on the motivation for the KIP. If we remove that feature, we
>>>>>> could
>>>>>>>>> handle null and/or empty lists by using the default partitioning,
>>>>>>>>> equivalent to how we handle null return values from the existing
>>>>>> partition
>>>>>>>>> method today.
>>>>>>>>>
>>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <cadonna@apache.org
>>>
>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Sagar,
>>>>>>>>>>
>>>>>>>>>> Thank you for the updates!
>>>>>>>>>>
>>>>>>>>>> I do not intend to prolong this vote thread more than needed,
>> but I
>>>>>>>>>> still have some points.
>>>>>>>>>>
>>>>>>>>>> The deprecated partition method can return null if the default
>>>>>>>>>> partitioning logic of the producer should be used.
>>>>>>>>>> With the new method partitions() it seems that it is not possible
>>>> to
>>>>>> use
>>>>>>>>>> the default partitioning logic, anymore.
>>>>>>>>>>
>>>>>>>>>> Also, in the default implementation of method partitions(), a
>>>> record
>>>>>>>>>> that would use the default partitioning logic in method
>> partition()
>>>>>>>>>> would be dropped, which would break backward compatibility since
>>>>>> Streams
>>>>>>>>>> would always call the new method partitions() even though the
>> users
>>>>>>>>>> still implement the deprecated method partition().
>>>>>>>>>>
>>>>>>>>>> I have a last point that we should probably discuss on the PR and
>>>> not
>>>>>> on
>>>>>>>>>> the KIP but since you added the code in the KIP I need to mention
>>>> it.
>>>>>> I
>>>>>>>>>> do not think you should check the validity of the partition
>> number
>>>>>> since
>>>>>>>>>> the ProducerRecord does the same check and throws an exception.
>> If
>>>>>>>>>> Streams adds the same check but does not throw, the behavior is
>> not
>>>>>>>>>> backward compatible.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Bruno
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
>>>>>>>>>>> Thanks Bruno/Chris,
>>>>>>>>>>>
>>>>>>>>>>> Even I agree that might be better to keep it simple like the way
>>>>>> Chris
>>>>>>>>>>> suggested. I have updated the KIP accordingly. I made couple of
>>>> minor
>>>>>>>>>>> changes to the KIP:
>>>>>>>>>>>
>>>>>>>>>>> 1) One of them being the change of return type of partitions
>>>> method
>>>>>>>>> from
>>>>>>>>>>> List to Set. This is to ensure that in case the implementation
>> of
>>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
>>>>>>>>>>> partition numbers, we won't have duplicates thereby not trying
>> to
>>>>>>>>> send to
>>>>>>>>>>> the same partition multiple times due to this.
>>>>>>>>>>> 2) I also added a check to send the record only to valid
>> partition
>>>>>>>>>> numbers
>>>>>>>>>>> and log and drop when the partition number is invalid. This is
>>>> again
>>>>>>>>> to
>>>>>>>>>>> prevent errors for cases when the StreamPartitioner
>> implementation
>>>>>> has
>>>>>>>>>> some
>>>>>>>>>>> bugs (since there are no validations as such).
>>>>>>>>>>> 3) I also updated the Test Plan section based on the suggestion
>>>> from
>>>>>>>>>> Bruno.
>>>>>>>>>>> 4) I updated the default implementation of partitions method
>>>> based on
>>>>>>>>> the
>>>>>>>>>>> great catch from Chris!
>>>>>>>>>>>
>>>>>>>>>>> Let me know if it looks fine now.
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Sagar.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
>> cadonna@apache.org
>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and leave
>>>> the
>>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I think
>>>> that
>>>>>> is
>>>>>>>>>>>> the cleanest option.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Bruno
>>>>>>>>>>>>
>>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think it'd be useful to be more explicit about broadcasting
>> to
>>>>>> all
>>>>>>>>>>>> topic
>>>>>>>>>>>>> partitions rather than add implicit behavior for empty cases
>>>> (empty
>>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach would
>>>>>>>>> address
>>>>>>>>>>>> that
>>>>>>>>>>>>> nicely.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It's also worth noting that there's no hard requirement to add
>>>>>> sugar
>>>>>>>>>> for
>>>>>>>>>>>>> broadcasting to all topic partitions since the API already
>>>> provides
>>>>>>>>> the
>>>>>>>>>>>>> number of topic partitions available when calling a stream
>>>>>>>>> partitioner.
>>>>>>>>>>>> If
>>>>>>>>>>>>> we can't find a clean way to add this support, it might be
>>>> better
>>>>>> to
>>>>>>>>>>>> leave
>>>>>>>>>>>>> it out and just let people implement this themselves with a
>>>> line of
>>>>>>>>>> Java
>>>>>>>>>>>> 8
>>>>>>>>>>>>> streams:
>>>>>>>>>>>>>
>>>>>>>>>>>>>          return IntStream.range(0,
>>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also worth noting that there may be a bug in the default
>>>>>>>>> implementation
>>>>>>>>>>>> for
>>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it doesn't
>>>>>>>>> appear
>>>>>>>>>> to
>>>>>>>>>>>>> correctly pick up on null return values from the partition
>>>> method
>>>>>>>>> and
>>>>>>>>>>>>> translate them into empty lists.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Chris
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
>>>> cadonna@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Sagar,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I do not see an issue with using an empty list together with
>> an
>>>>>>>>> empty
>>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty list
>>>> would
>>>>>>>>> just
>>>>>>>>>>>>>> say that there is no partition to which the record should be
>>>> sent.
>>>>>>>>> If
>>>>>>>>>>>>>> there is no partition, the record is dropped.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to all
>>>>>>>>> partitions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could return
>>>> an
>>>>>>>>>> object
>>>>>>>>>>>>>> with an enum and a list of partitions. The enum could have
>>>> values
>>>>>>>>>> SOME,
>>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
>>>>>>>>> contains
>>>>>>>>>> the
>>>>>>>>>>>>>> partitions to which to send the record. If the value of the
>>>> enum
>>>>>> is
>>>>>>>>>> ALL
>>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be even
>>>> null
>>>>>>>>>>>>>> (since in that case the list should not be used and it would
>>>> be a
>>>>>>>>> bug
>>>>>>>>>> to
>>>>>>>>>>>>>> use it).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I agree using null does seem error prone. However I think
>>>> using a
>>>>>>>>>>>>>> singleton
>>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I am
>>>> saying
>>>>>>>>> this
>>>>>>>>>>>>>>> because the KIP also has a provision to return an empty list
>>>> to
>>>>>>>>> refer
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty list
>>>> have
>>>>>>>>>>>> totally
>>>>>>>>>>>>>>> different meanings which could get confusing.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Let me know what you think.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>> Sagar.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>>>>>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I also concur with this, having an Optional in the type
>>>> makes it
>>>>>>>>>> very
>>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence of
>>>> value
>>>>>>>>> (or
>>>>>>>>>> in
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>> case the broadcast value).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Matthew de Detrich
>>>>>>>>>>>>>>>> Aiven Deutschland GmbH
>>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>>>>>>>>>>>>>>> m: +491603708037
>>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2.
>>>>>>>>>>>>>>>>> I would prefer changing the return type of partitions() to
>>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
>>>>>>>>> broadcast
>>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation returns
>> null
>>>> due
>>>>>>>>> to
>>>>>>>>>> a
>>>>>>>>>>>>>> bug
>>>>>>>>>>>>>>>>> is much higher than that an implementation returns an
>> empty
>>>>>>>>>> Optional
>>>>>>>>>>>>>> due
>>>>>>>>>>>>>>>>> to a bug.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Thanks Sagar, this makes sense to me -- we clearly need additional changes
to
avoid breaking IQ when using this feature, but I agree with continuing to
restrict
FKJ since they wouldn't stop working without it, and would become much
harder
to reason about (than they already are) if we did enable them to use it.

And of course, they can still multicast the final results of a FKJ, they
just can't
mess with the internal workings of it in this way.

On Tue, Dec 6, 2022 at 9:48 AM Sagar <sa...@gmail.com> wrote:

> Hi All,
>
> I made a couple of edits to the KIP which came up during the code review.
> Changes at a high level are:
>
> 1) KeyQueryMetada enhanced to have a new method called partitions().
> 2) Lifting the restriction of a single partition for IQ. Now the
> restriction holds only for FK Join.
>
> Updated KIP:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
>
> Thanks!
> Sagar.
>
> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com> wrote:
>
> > Thanks Bruno,
> >
> > Marking this as accepted.
> >
> > Thanks everyone for their comments/feedback.
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org>
> wrote:
> >
> >> Hi Sagar,
> >>
> >> Thanks for the update and the PR!
> >>
> >> +1 (binding)
> >>
> >> Best,
> >> Bruno
> >>
> >> On 10.09.22 18:57, Sagar wrote:
> >> > Hi Bruno,
> >> >
> >> > Thanks, I think these changes make sense to me. I have updated the KIP
> >> > accordingly.
> >> >
> >> > Thanks!
> >> > Sagar.
> >> >
> >> > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org>
> >> wrote:
> >> >
> >> >> Hi Sagar,
> >> >>
> >> >> I would not drop the support for dropping records. I would also not
> >> >> return null from partitions(). Maybe an Optional can help here. An
> >> empty
> >> >> Optional would mean to use the default partitioning behavior of the
> >> >> producer. So we would have:
> >> >>
> >> >> - non-empty Optional, non-empty list of integers: partitions to send
> >> the
> >> >> record to
> >> >> - non-empty Optional, empty list of integers: drop the record
> >> >> - empty Optional: use default behavior
> >> >>
> >> >> What do other think?
> >> >>
> >> >> Best,
> >> >> Bruno
> >> >>
> >> >> On 02.09.22 13:53, Sagar wrote:
> >> >>> Hello Bruno/Chris,
> >> >>>
> >> >>> Since these are the last set of changes(I am assuming haha), it
> would
> >> be
> >> >>> great if you could review the 2 options from above so that we can
> >> close
> >> >> the
> >> >>> voting. Of course I am happy to incorporate any other requisite
> >> changes.
> >> >>>
> >> >>> Thanks!
> >> >>> Sagar.
> >> >>>
> >> >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com>
> >> >> wrote:
> >> >>>
> >> >>>> Thanks Bruno for the great points.
> >> >>>>
> >> >>>> I see 2 options here =>
> >> >>>>
> >> >>>> 1) As Chris suggested, drop the support for dropping records in the
> >> >>>> partitioner. That way, an empty list could signify the usage of a
> >> >> default
> >> >>>> partitioner. Also, if the deprecated partition() method returns
> null
> >> >>>> thereby signifying the default partitioner, the partitions() can
> >> return
> >> >> an
> >> >>>> empty list i.e default partitioner.
> >> >>>>
> >> >>>> 2) OR we treat a null return type of partitions() method to signify
> >> the
> >> >>>> usage of the default partitioner. In the default implementation of
> >> >>>> partitions() method, if partition() returns null, then even
> >> partitions()
> >> >>>> can return null(instead of an empty list). The RecordCollectorImpl
> >> code
> >> >> can
> >> >>>> also be modified accordingly. @Chris, to your point, we can even
> drop
> >> >> the
> >> >>>> support of dropping of records. It came up during KIP discussion,
> >> and I
> >> >>>> thought it might be a useful feature. Let me know what you think.
> >> >>>>
> >> >>>> 3) Lastly about the partition number check. I wanted to avoid the
> >> >> throwing
> >> >>>> of exception so I thought adding it might be a useful feature. But
> as
> >> >> you
> >> >>>> pointed out, if it can break backwards compatibility, it's better
> to
> >> >> remove
> >> >>>> it.
> >> >>>>
> >> >>>> Thanks!
> >> >>>> Sagar.
> >> >>>>
> >> >>>>
> >> >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
> >> <ch...@aiven.io.invalid>
> >> >>>> wrote:
> >> >>>>
> >> >>>>> +1 to Bruno's concerns about backward compatibility. Do we
> actually
> >> >> need
> >> >>>>> support for dropping records in the partitioner? It doesn't seem
> >> >> necessary
> >> >>>>> based on the motivation for the KIP. If we remove that feature, we
> >> >> could
> >> >>>>> handle null and/or empty lists by using the default partitioning,
> >> >>>>> equivalent to how we handle null return values from the existing
> >> >> partition
> >> >>>>> method today.
> >> >>>>>
> >> >>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <cadonna@apache.org
> >
> >> >> wrote:
> >> >>>>>
> >> >>>>>> Hi Sagar,
> >> >>>>>>
> >> >>>>>> Thank you for the updates!
> >> >>>>>>
> >> >>>>>> I do not intend to prolong this vote thread more than needed,
> but I
> >> >>>>>> still have some points.
> >> >>>>>>
> >> >>>>>> The deprecated partition method can return null if the default
> >> >>>>>> partitioning logic of the producer should be used.
> >> >>>>>> With the new method partitions() it seems that it is not possible
> >> to
> >> >> use
> >> >>>>>> the default partitioning logic, anymore.
> >> >>>>>>
> >> >>>>>> Also, in the default implementation of method partitions(), a
> >> record
> >> >>>>>> that would use the default partitioning logic in method
> partition()
> >> >>>>>> would be dropped, which would break backward compatibility since
> >> >> Streams
> >> >>>>>> would always call the new method partitions() even though the
> users
> >> >>>>>> still implement the deprecated method partition().
> >> >>>>>>
> >> >>>>>> I have a last point that we should probably discuss on the PR and
> >> not
> >> >> on
> >> >>>>>> the KIP but since you added the code in the KIP I need to mention
> >> it.
> >> >> I
> >> >>>>>> do not think you should check the validity of the partition
> number
> >> >> since
> >> >>>>>> the ProducerRecord does the same check and throws an exception.
> If
> >> >>>>>> Streams adds the same check but does not throw, the behavior is
> not
> >> >>>>>> backward compatible.
> >> >>>>>>
> >> >>>>>> Best,
> >> >>>>>> Bruno
> >> >>>>>>
> >> >>>>>>
> >> >>>>>> On 30.08.22 12:43, Sagar wrote:
> >> >>>>>>> Thanks Bruno/Chris,
> >> >>>>>>>
> >> >>>>>>> Even I agree that might be better to keep it simple like the way
> >> >> Chris
> >> >>>>>>> suggested. I have updated the KIP accordingly. I made couple of
> >> minor
> >> >>>>>>> changes to the KIP:
> >> >>>>>>>
> >> >>>>>>> 1) One of them being the change of return type of partitions
> >> method
> >> >>>>> from
> >> >>>>>>> List to Set. This is to ensure that in case the implementation
> of
> >> >>>>>>> StreamPartitoner is buggy and ends up returning duplicate
> >> >>>>>>> partition numbers, we won't have duplicates thereby not trying
> to
> >> >>>>> send to
> >> >>>>>>> the same partition multiple times due to this.
> >> >>>>>>> 2) I also added a check to send the record only to valid
> partition
> >> >>>>>> numbers
> >> >>>>>>> and log and drop when the partition number is invalid. This is
> >> again
> >> >>>>> to
> >> >>>>>>> prevent errors for cases when the StreamPartitioner
> implementation
> >> >> has
> >> >>>>>> some
> >> >>>>>>> bugs (since there are no validations as such).
> >> >>>>>>> 3) I also updated the Test Plan section based on the suggestion
> >> from
> >> >>>>>> Bruno.
> >> >>>>>>> 4) I updated the default implementation of partitions method
> >> based on
> >> >>>>> the
> >> >>>>>>> great catch from Chris!
> >> >>>>>>>
> >> >>>>>>> Let me know if it looks fine now.
> >> >>>>>>>
> >> >>>>>>> Thanks!
> >> >>>>>>> Sagar.
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
> cadonna@apache.org
> >> >
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>>> Hi,
> >> >>>>>>>>
> >> >>>>>>>> I am favour of discarding the sugar for broadcasting and leave
> >> the
> >> >>>>>>>> broadcasting to the implementation as Chris suggests. I think
> >> that
> >> >> is
> >> >>>>>>>> the cleanest option.
> >> >>>>>>>>
> >> >>>>>>>> Best,
> >> >>>>>>>> Bruno
> >> >>>>>>>>
> >> >>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
> >> >>>>>>>>> Hi all,
> >> >>>>>>>>>
> >> >>>>>>>>> I think it'd be useful to be more explicit about broadcasting
> to
> >> >> all
> >> >>>>>>>> topic
> >> >>>>>>>>> partitions rather than add implicit behavior for empty cases
> >> (empty
> >> >>>>>>>>> optional, empty list, etc.). The suggested enum approach would
> >> >>>>> address
> >> >>>>>>>> that
> >> >>>>>>>>> nicely.
> >> >>>>>>>>>
> >> >>>>>>>>> It's also worth noting that there's no hard requirement to add
> >> >> sugar
> >> >>>>>> for
> >> >>>>>>>>> broadcasting to all topic partitions since the API already
> >> provides
> >> >>>>> the
> >> >>>>>>>>> number of topic partitions available when calling a stream
> >> >>>>> partitioner.
> >> >>>>>>>> If
> >> >>>>>>>>> we can't find a clean way to add this support, it might be
> >> better
> >> >> to
> >> >>>>>>>> leave
> >> >>>>>>>>> it out and just let people implement this themselves with a
> >> line of
> >> >>>>>> Java
> >> >>>>>>>> 8
> >> >>>>>>>>> streams:
> >> >>>>>>>>>
> >> >>>>>>>>>         return IntStream.range(0,
> >> >>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
> >> >>>>>>>>>
> >> >>>>>>>>> Also worth noting that there may be a bug in the default
> >> >>>>> implementation
> >> >>>>>>>> for
> >> >>>>>>>>> the new StreamPartitioner::partitions method, since it doesn't
> >> >>>>> appear
> >> >>>>>> to
> >> >>>>>>>>> correctly pick up on null return values from the partition
> >> method
> >> >>>>> and
> >> >>>>>>>>> translate them into empty lists.
> >> >>>>>>>>>
> >> >>>>>>>>> Cheers,
> >> >>>>>>>>>
> >> >>>>>>>>> Chris
> >> >>>>>>>>>
> >> >>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
> >> cadonna@apache.org>
> >> >>>>>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>>> Hi Sagar,
> >> >>>>>>>>>>
> >> >>>>>>>>>> I do not see an issue with using an empty list together with
> an
> >> >>>>> empty
> >> >>>>>>>>>> Optional. A non-empty Optional that contains an empty list
> >> would
> >> >>>>> just
> >> >>>>>>>>>> say that there is no partition to which the record should be
> >> sent.
> >> >>>>> If
> >> >>>>>>>>>> there is no partition, the record is dropped.
> >> >>>>>>>>>>
> >> >>>>>>>>>> An empty Optional might be a way to say, broadcast to all
> >> >>>>> partitions.
> >> >>>>>>>>>>
> >> >>>>>>>>>> Alternatively -- to make it more explicit -- you could return
> >> an
> >> >>>>>> object
> >> >>>>>>>>>> with an enum and a list of partitions. The enum could have
> >> values
> >> >>>>>> SOME,
> >> >>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
> >> >>>>> contains
> >> >>>>>> the
> >> >>>>>>>>>> partitions to which to send the record. If the value of the
> >> enum
> >> >> is
> >> >>>>>> ALL
> >> >>>>>>>>>> or NONE, the list of partitions is not used and might be even
> >> null
> >> >>>>>>>>>> (since in that case the list should not be used and it would
> >> be a
> >> >>>>> bug
> >> >>>>>> to
> >> >>>>>>>>>> use it).
> >> >>>>>>>>>>
> >> >>>>>>>>>> Best,
> >> >>>>>>>>>> Bruno
> >> >>>>>>>>>>
> >> >>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
> >> >>>>>>>>>>> Thank you Bruno/Matthew for your comments.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I agree using null does seem error prone. However I think
> >> using a
> >> >>>>>>>>>> singleton
> >> >>>>>>>>>>> list of [-1] might be better in terms of usability, I am
> >> saying
> >> >>>>> this
> >> >>>>>>>>>>> because the KIP also has a provision to return an empty list
> >> to
> >> >>>>> refer
> >> >>>>>>>> to
> >> >>>>>>>>>>> dropping the record. So, an empty optional and an empty list
> >> have
> >> >>>>>>>> totally
> >> >>>>>>>>>>> different meanings which could get confusing.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Let me know what you think.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Thanks!
> >> >>>>>>>>>>> Sagar.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> >> >>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>> I also concur with this, having an Optional in the type
> >> makes it
> >> >>>>>> very
> >> >>>>>>>>>>>> clear what’s going on and better signifies an absence of
> >> value
> >> >>>>> (or
> >> >>>>>> in
> >> >>>>>>>>>> this
> >> >>>>>>>>>>>> case the broadcast value).
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> --
> >> >>>>>>>>>>>> Matthew de Detrich
> >> >>>>>>>>>>>> Aiven Deutschland GmbH
> >> >>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >> >>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >> >>>>>>>>>>>> m: +491603708037
> >> >>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
> >> >>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >> >>>>>>>>>>>>>
> >> >>>>>>>>>>>>> 2.
> >> >>>>>>>>>>>>> I would prefer changing the return type of partitions() to
> >> >>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
> >> >>>>> broadcast
> >> >>>>>>>>>>>>> value. IMO, The chances that an implementation returns
> null
> >> due
> >> >>>>> to
> >> >>>>>> a
> >> >>>>>>>>>> bug
> >> >>>>>>>>>>>>> is much higher than that an implementation returns an
> empty
> >> >>>>>> Optional
> >> >>>>>>>>>> due
> >> >>>>>>>>>>>>> to a bug.
> >> >>>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>
> >> >
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Hi All,

I made a couple of edits to the KIP which came up during the code review.
Changes at a high level are:

1) KeyQueryMetada enhanced to have a new method called partitions().
2) Lifting the restriction of a single partition for IQ. Now the
restriction holds only for FK Join.

Updated KIP:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 6:43 PM Sagar <sa...@gmail.com> wrote:

> Thanks Bruno,
>
> Marking this as accepted.
>
> Thanks everyone for their comments/feedback.
>
> Thanks!
> Sagar.
>
> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org> wrote:
>
>> Hi Sagar,
>>
>> Thanks for the update and the PR!
>>
>> +1 (binding)
>>
>> Best,
>> Bruno
>>
>> On 10.09.22 18:57, Sagar wrote:
>> > Hi Bruno,
>> >
>> > Thanks, I think these changes make sense to me. I have updated the KIP
>> > accordingly.
>> >
>> > Thanks!
>> > Sagar.
>> >
>> > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org>
>> wrote:
>> >
>> >> Hi Sagar,
>> >>
>> >> I would not drop the support for dropping records. I would also not
>> >> return null from partitions(). Maybe an Optional can help here. An
>> empty
>> >> Optional would mean to use the default partitioning behavior of the
>> >> producer. So we would have:
>> >>
>> >> - non-empty Optional, non-empty list of integers: partitions to send
>> the
>> >> record to
>> >> - non-empty Optional, empty list of integers: drop the record
>> >> - empty Optional: use default behavior
>> >>
>> >> What do other think?
>> >>
>> >> Best,
>> >> Bruno
>> >>
>> >> On 02.09.22 13:53, Sagar wrote:
>> >>> Hello Bruno/Chris,
>> >>>
>> >>> Since these are the last set of changes(I am assuming haha), it would
>> be
>> >>> great if you could review the 2 options from above so that we can
>> close
>> >> the
>> >>> voting. Of course I am happy to incorporate any other requisite
>> changes.
>> >>>
>> >>> Thanks!
>> >>> Sagar.
>> >>>
>> >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com>
>> >> wrote:
>> >>>
>> >>>> Thanks Bruno for the great points.
>> >>>>
>> >>>> I see 2 options here =>
>> >>>>
>> >>>> 1) As Chris suggested, drop the support for dropping records in the
>> >>>> partitioner. That way, an empty list could signify the usage of a
>> >> default
>> >>>> partitioner. Also, if the deprecated partition() method returns null
>> >>>> thereby signifying the default partitioner, the partitions() can
>> return
>> >> an
>> >>>> empty list i.e default partitioner.
>> >>>>
>> >>>> 2) OR we treat a null return type of partitions() method to signify
>> the
>> >>>> usage of the default partitioner. In the default implementation of
>> >>>> partitions() method, if partition() returns null, then even
>> partitions()
>> >>>> can return null(instead of an empty list). The RecordCollectorImpl
>> code
>> >> can
>> >>>> also be modified accordingly. @Chris, to your point, we can even drop
>> >> the
>> >>>> support of dropping of records. It came up during KIP discussion,
>> and I
>> >>>> thought it might be a useful feature. Let me know what you think.
>> >>>>
>> >>>> 3) Lastly about the partition number check. I wanted to avoid the
>> >> throwing
>> >>>> of exception so I thought adding it might be a useful feature. But as
>> >> you
>> >>>> pointed out, if it can break backwards compatibility, it's better to
>> >> remove
>> >>>> it.
>> >>>>
>> >>>> Thanks!
>> >>>> Sagar.
>> >>>>
>> >>>>
>> >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
>> <ch...@aiven.io.invalid>
>> >>>> wrote:
>> >>>>
>> >>>>> +1 to Bruno's concerns about backward compatibility. Do we actually
>> >> need
>> >>>>> support for dropping records in the partitioner? It doesn't seem
>> >> necessary
>> >>>>> based on the motivation for the KIP. If we remove that feature, we
>> >> could
>> >>>>> handle null and/or empty lists by using the default partitioning,
>> >>>>> equivalent to how we handle null return values from the existing
>> >> partition
>> >>>>> method today.
>> >>>>>
>> >>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <ca...@apache.org>
>> >> wrote:
>> >>>>>
>> >>>>>> Hi Sagar,
>> >>>>>>
>> >>>>>> Thank you for the updates!
>> >>>>>>
>> >>>>>> I do not intend to prolong this vote thread more than needed, but I
>> >>>>>> still have some points.
>> >>>>>>
>> >>>>>> The deprecated partition method can return null if the default
>> >>>>>> partitioning logic of the producer should be used.
>> >>>>>> With the new method partitions() it seems that it is not possible
>> to
>> >> use
>> >>>>>> the default partitioning logic, anymore.
>> >>>>>>
>> >>>>>> Also, in the default implementation of method partitions(), a
>> record
>> >>>>>> that would use the default partitioning logic in method partition()
>> >>>>>> would be dropped, which would break backward compatibility since
>> >> Streams
>> >>>>>> would always call the new method partitions() even though the users
>> >>>>>> still implement the deprecated method partition().
>> >>>>>>
>> >>>>>> I have a last point that we should probably discuss on the PR and
>> not
>> >> on
>> >>>>>> the KIP but since you added the code in the KIP I need to mention
>> it.
>> >> I
>> >>>>>> do not think you should check the validity of the partition number
>> >> since
>> >>>>>> the ProducerRecord does the same check and throws an exception. If
>> >>>>>> Streams adds the same check but does not throw, the behavior is not
>> >>>>>> backward compatible.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Bruno
>> >>>>>>
>> >>>>>>
>> >>>>>> On 30.08.22 12:43, Sagar wrote:
>> >>>>>>> Thanks Bruno/Chris,
>> >>>>>>>
>> >>>>>>> Even I agree that might be better to keep it simple like the way
>> >> Chris
>> >>>>>>> suggested. I have updated the KIP accordingly. I made couple of
>> minor
>> >>>>>>> changes to the KIP:
>> >>>>>>>
>> >>>>>>> 1) One of them being the change of return type of partitions
>> method
>> >>>>> from
>> >>>>>>> List to Set. This is to ensure that in case the implementation of
>> >>>>>>> StreamPartitoner is buggy and ends up returning duplicate
>> >>>>>>> partition numbers, we won't have duplicates thereby not trying to
>> >>>>> send to
>> >>>>>>> the same partition multiple times due to this.
>> >>>>>>> 2) I also added a check to send the record only to valid partition
>> >>>>>> numbers
>> >>>>>>> and log and drop when the partition number is invalid. This is
>> again
>> >>>>> to
>> >>>>>>> prevent errors for cases when the StreamPartitioner implementation
>> >> has
>> >>>>>> some
>> >>>>>>> bugs (since there are no validations as such).
>> >>>>>>> 3) I also updated the Test Plan section based on the suggestion
>> from
>> >>>>>> Bruno.
>> >>>>>>> 4) I updated the default implementation of partitions method
>> based on
>> >>>>> the
>> >>>>>>> great catch from Chris!
>> >>>>>>>
>> >>>>>>> Let me know if it looks fine now.
>> >>>>>>>
>> >>>>>>> Thanks!
>> >>>>>>> Sagar.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <cadonna@apache.org
>> >
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>> I am favour of discarding the sugar for broadcasting and leave
>> the
>> >>>>>>>> broadcasting to the implementation as Chris suggests. I think
>> that
>> >> is
>> >>>>>>>> the cleanest option.
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Bruno
>> >>>>>>>>
>> >>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>> >>>>>>>>> Hi all,
>> >>>>>>>>>
>> >>>>>>>>> I think it'd be useful to be more explicit about broadcasting to
>> >> all
>> >>>>>>>> topic
>> >>>>>>>>> partitions rather than add implicit behavior for empty cases
>> (empty
>> >>>>>>>>> optional, empty list, etc.). The suggested enum approach would
>> >>>>> address
>> >>>>>>>> that
>> >>>>>>>>> nicely.
>> >>>>>>>>>
>> >>>>>>>>> It's also worth noting that there's no hard requirement to add
>> >> sugar
>> >>>>>> for
>> >>>>>>>>> broadcasting to all topic partitions since the API already
>> provides
>> >>>>> the
>> >>>>>>>>> number of topic partitions available when calling a stream
>> >>>>> partitioner.
>> >>>>>>>> If
>> >>>>>>>>> we can't find a clean way to add this support, it might be
>> better
>> >> to
>> >>>>>>>> leave
>> >>>>>>>>> it out and just let people implement this themselves with a
>> line of
>> >>>>>> Java
>> >>>>>>>> 8
>> >>>>>>>>> streams:
>> >>>>>>>>>
>> >>>>>>>>>         return IntStream.range(0,
>> >>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>> >>>>>>>>>
>> >>>>>>>>> Also worth noting that there may be a bug in the default
>> >>>>> implementation
>> >>>>>>>> for
>> >>>>>>>>> the new StreamPartitioner::partitions method, since it doesn't
>> >>>>> appear
>> >>>>>> to
>> >>>>>>>>> correctly pick up on null return values from the partition
>> method
>> >>>>> and
>> >>>>>>>>> translate them into empty lists.
>> >>>>>>>>>
>> >>>>>>>>> Cheers,
>> >>>>>>>>>
>> >>>>>>>>> Chris
>> >>>>>>>>>
>> >>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
>> cadonna@apache.org>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Sagar,
>> >>>>>>>>>>
>> >>>>>>>>>> I do not see an issue with using an empty list together with an
>> >>>>> empty
>> >>>>>>>>>> Optional. A non-empty Optional that contains an empty list
>> would
>> >>>>> just
>> >>>>>>>>>> say that there is no partition to which the record should be
>> sent.
>> >>>>> If
>> >>>>>>>>>> there is no partition, the record is dropped.
>> >>>>>>>>>>
>> >>>>>>>>>> An empty Optional might be a way to say, broadcast to all
>> >>>>> partitions.
>> >>>>>>>>>>
>> >>>>>>>>>> Alternatively -- to make it more explicit -- you could return
>> an
>> >>>>>> object
>> >>>>>>>>>> with an enum and a list of partitions. The enum could have
>> values
>> >>>>>> SOME,
>> >>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
>> >>>>> contains
>> >>>>>> the
>> >>>>>>>>>> partitions to which to send the record. If the value of the
>> enum
>> >> is
>> >>>>>> ALL
>> >>>>>>>>>> or NONE, the list of partitions is not used and might be even
>> null
>> >>>>>>>>>> (since in that case the list should not be used and it would
>> be a
>> >>>>> bug
>> >>>>>> to
>> >>>>>>>>>> use it).
>> >>>>>>>>>>
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Bruno
>> >>>>>>>>>>
>> >>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>> >>>>>>>>>>> Thank you Bruno/Matthew for your comments.
>> >>>>>>>>>>>
>> >>>>>>>>>>> I agree using null does seem error prone. However I think
>> using a
>> >>>>>>>>>> singleton
>> >>>>>>>>>>> list of [-1] might be better in terms of usability, I am
>> saying
>> >>>>> this
>> >>>>>>>>>>> because the KIP also has a provision to return an empty list
>> to
>> >>>>> refer
>> >>>>>>>> to
>> >>>>>>>>>>> dropping the record. So, an empty optional and an empty list
>> have
>> >>>>>>>> totally
>> >>>>>>>>>>> different meanings which could get confusing.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Let me know what you think.
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks!
>> >>>>>>>>>>> Sagar.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>> >>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>>> I also concur with this, having an Optional in the type
>> makes it
>> >>>>>> very
>> >>>>>>>>>>>> clear what’s going on and better signifies an absence of
>> value
>> >>>>> (or
>> >>>>>> in
>> >>>>>>>>>> this
>> >>>>>>>>>>>> case the broadcast value).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> --
>> >>>>>>>>>>>> Matthew de Detrich
>> >>>>>>>>>>>> Aiven Deutschland GmbH
>> >>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>> >>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> >>>>>>>>>>>> m: +491603708037
>> >>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>> >>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> 2.
>> >>>>>>>>>>>>> I would prefer changing the return type of partitions() to
>> >>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
>> >>>>> broadcast
>> >>>>>>>>>>>>> value. IMO, The chances that an implementation returns null
>> due
>> >>>>> to
>> >>>>>> a
>> >>>>>>>>>> bug
>> >>>>>>>>>>>>> is much higher than that an implementation returns an empty
>> >>>>>> Optional
>> >>>>>>>>>> due
>> >>>>>>>>>>>>> to a bug.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Thanks Bruno,

Marking this as accepted.

Thanks everyone for their comments/feedback.

Thanks!
Sagar.

On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <ca...@apache.org> wrote:

> Hi Sagar,
>
> Thanks for the update and the PR!
>
> +1 (binding)
>
> Best,
> Bruno
>
> On 10.09.22 18:57, Sagar wrote:
> > Hi Bruno,
> >
> > Thanks, I think these changes make sense to me. I have updated the KIP
> > accordingly.
> >
> > Thanks!
> > Sagar.
> >
> > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org> wrote:
> >
> >> Hi Sagar,
> >>
> >> I would not drop the support for dropping records. I would also not
> >> return null from partitions(). Maybe an Optional can help here. An empty
> >> Optional would mean to use the default partitioning behavior of the
> >> producer. So we would have:
> >>
> >> - non-empty Optional, non-empty list of integers: partitions to send the
> >> record to
> >> - non-empty Optional, empty list of integers: drop the record
> >> - empty Optional: use default behavior
> >>
> >> What do other think?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 02.09.22 13:53, Sagar wrote:
> >>> Hello Bruno/Chris,
> >>>
> >>> Since these are the last set of changes(I am assuming haha), it would
> be
> >>> great if you could review the 2 options from above so that we can close
> >> the
> >>> voting. Of course I am happy to incorporate any other requisite
> changes.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com>
> >> wrote:
> >>>
> >>>> Thanks Bruno for the great points.
> >>>>
> >>>> I see 2 options here =>
> >>>>
> >>>> 1) As Chris suggested, drop the support for dropping records in the
> >>>> partitioner. That way, an empty list could signify the usage of a
> >> default
> >>>> partitioner. Also, if the deprecated partition() method returns null
> >>>> thereby signifying the default partitioner, the partitions() can
> return
> >> an
> >>>> empty list i.e default partitioner.
> >>>>
> >>>> 2) OR we treat a null return type of partitions() method to signify
> the
> >>>> usage of the default partitioner. In the default implementation of
> >>>> partitions() method, if partition() returns null, then even
> partitions()
> >>>> can return null(instead of an empty list). The RecordCollectorImpl
> code
> >> can
> >>>> also be modified accordingly. @Chris, to your point, we can even drop
> >> the
> >>>> support of dropping of records. It came up during KIP discussion, and
> I
> >>>> thought it might be a useful feature. Let me know what you think.
> >>>>
> >>>> 3) Lastly about the partition number check. I wanted to avoid the
> >> throwing
> >>>> of exception so I thought adding it might be a useful feature. But as
> >> you
> >>>> pointed out, if it can break backwards compatibility, it's better to
> >> remove
> >>>> it.
> >>>>
> >>>> Thanks!
> >>>> Sagar.
> >>>>
> >>>>
> >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <chrise@aiven.io.invalid
> >
> >>>> wrote:
> >>>>
> >>>>> +1 to Bruno's concerns about backward compatibility. Do we actually
> >> need
> >>>>> support for dropping records in the partitioner? It doesn't seem
> >> necessary
> >>>>> based on the motivation for the KIP. If we remove that feature, we
> >> could
> >>>>> handle null and/or empty lists by using the default partitioning,
> >>>>> equivalent to how we handle null return values from the existing
> >> partition
> >>>>> method today.
> >>>>>
> >>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <ca...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> Thank you for the updates!
> >>>>>>
> >>>>>> I do not intend to prolong this vote thread more than needed, but I
> >>>>>> still have some points.
> >>>>>>
> >>>>>> The deprecated partition method can return null if the default
> >>>>>> partitioning logic of the producer should be used.
> >>>>>> With the new method partitions() it seems that it is not possible to
> >> use
> >>>>>> the default partitioning logic, anymore.
> >>>>>>
> >>>>>> Also, in the default implementation of method partitions(), a record
> >>>>>> that would use the default partitioning logic in method partition()
> >>>>>> would be dropped, which would break backward compatibility since
> >> Streams
> >>>>>> would always call the new method partitions() even though the users
> >>>>>> still implement the deprecated method partition().
> >>>>>>
> >>>>>> I have a last point that we should probably discuss on the PR and
> not
> >> on
> >>>>>> the KIP but since you added the code in the KIP I need to mention
> it.
> >> I
> >>>>>> do not think you should check the validity of the partition number
> >> since
> >>>>>> the ProducerRecord does the same check and throws an exception. If
> >>>>>> Streams adds the same check but does not throw, the behavior is not
> >>>>>> backward compatible.
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>>
> >>>>>> On 30.08.22 12:43, Sagar wrote:
> >>>>>>> Thanks Bruno/Chris,
> >>>>>>>
> >>>>>>> Even I agree that might be better to keep it simple like the way
> >> Chris
> >>>>>>> suggested. I have updated the KIP accordingly. I made couple of
> minor
> >>>>>>> changes to the KIP:
> >>>>>>>
> >>>>>>> 1) One of them being the change of return type of partitions method
> >>>>> from
> >>>>>>> List to Set. This is to ensure that in case the implementation of
> >>>>>>> StreamPartitoner is buggy and ends up returning duplicate
> >>>>>>> partition numbers, we won't have duplicates thereby not trying to
> >>>>> send to
> >>>>>>> the same partition multiple times due to this.
> >>>>>>> 2) I also added a check to send the record only to valid partition
> >>>>>> numbers
> >>>>>>> and log and drop when the partition number is invalid. This is
> again
> >>>>> to
> >>>>>>> prevent errors for cases when the StreamPartitioner implementation
> >> has
> >>>>>> some
> >>>>>>> bugs (since there are no validations as such).
> >>>>>>> 3) I also updated the Test Plan section based on the suggestion
> from
> >>>>>> Bruno.
> >>>>>>> 4) I updated the default implementation of partitions method based
> on
> >>>>> the
> >>>>>>> great catch from Chris!
> >>>>>>>
> >>>>>>> Let me know if it looks fine now.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>>
> >>>>>>>> I am favour of discarding the sugar for broadcasting and leave the
> >>>>>>>> broadcasting to the implementation as Chris suggests. I think that
> >> is
> >>>>>>>> the cleanest option.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
> >>>>>>>>> Hi all,
> >>>>>>>>>
> >>>>>>>>> I think it'd be useful to be more explicit about broadcasting to
> >> all
> >>>>>>>> topic
> >>>>>>>>> partitions rather than add implicit behavior for empty cases
> (empty
> >>>>>>>>> optional, empty list, etc.). The suggested enum approach would
> >>>>> address
> >>>>>>>> that
> >>>>>>>>> nicely.
> >>>>>>>>>
> >>>>>>>>> It's also worth noting that there's no hard requirement to add
> >> sugar
> >>>>>> for
> >>>>>>>>> broadcasting to all topic partitions since the API already
> provides
> >>>>> the
> >>>>>>>>> number of topic partitions available when calling a stream
> >>>>> partitioner.
> >>>>>>>> If
> >>>>>>>>> we can't find a clean way to add this support, it might be better
> >> to
> >>>>>>>> leave
> >>>>>>>>> it out and just let people implement this themselves with a line
> of
> >>>>>> Java
> >>>>>>>> 8
> >>>>>>>>> streams:
> >>>>>>>>>
> >>>>>>>>>         return IntStream.range(0,
> >>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
> >>>>>>>>>
> >>>>>>>>> Also worth noting that there may be a bug in the default
> >>>>> implementation
> >>>>>>>> for
> >>>>>>>>> the new StreamPartitioner::partitions method, since it doesn't
> >>>>> appear
> >>>>>> to
> >>>>>>>>> correctly pick up on null return values from the partition method
> >>>>> and
> >>>>>>>>> translate them into empty lists.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>>
> >>>>>>>>> Chris
> >>>>>>>>>
> >>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
> cadonna@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Sagar,
> >>>>>>>>>>
> >>>>>>>>>> I do not see an issue with using an empty list together with an
> >>>>> empty
> >>>>>>>>>> Optional. A non-empty Optional that contains an empty list would
> >>>>> just
> >>>>>>>>>> say that there is no partition to which the record should be
> sent.
> >>>>> If
> >>>>>>>>>> there is no partition, the record is dropped.
> >>>>>>>>>>
> >>>>>>>>>> An empty Optional might be a way to say, broadcast to all
> >>>>> partitions.
> >>>>>>>>>>
> >>>>>>>>>> Alternatively -- to make it more explicit -- you could return an
> >>>>>> object
> >>>>>>>>>> with an enum and a list of partitions. The enum could have
> values
> >>>>>> SOME,
> >>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
> >>>>> contains
> >>>>>> the
> >>>>>>>>>> partitions to which to send the record. If the value of the enum
> >> is
> >>>>>> ALL
> >>>>>>>>>> or NONE, the list of partitions is not used and might be even
> null
> >>>>>>>>>> (since in that case the list should not be used and it would be
> a
> >>>>> bug
> >>>>>> to
> >>>>>>>>>> use it).
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Bruno
> >>>>>>>>>>
> >>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
> >>>>>>>>>>> Thank you Bruno/Matthew for your comments.
> >>>>>>>>>>>
> >>>>>>>>>>> I agree using null does seem error prone. However I think
> using a
> >>>>>>>>>> singleton
> >>>>>>>>>>> list of [-1] might be better in terms of usability, I am saying
> >>>>> this
> >>>>>>>>>>> because the KIP also has a provision to return an empty list to
> >>>>> refer
> >>>>>>>> to
> >>>>>>>>>>> dropping the record. So, an empty optional and an empty list
> have
> >>>>>>>> totally
> >>>>>>>>>>> different meanings which could get confusing.
> >>>>>>>>>>>
> >>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Sagar.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> >>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> I also concur with this, having an Optional in the type makes
> it
> >>>>>> very
> >>>>>>>>>>>> clear what’s going on and better signifies an absence of value
> >>>>> (or
> >>>>>> in
> >>>>>>>>>> this
> >>>>>>>>>>>> case the broadcast value).
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> Matthew de Detrich
> >>>>>>>>>>>> Aiven Deutschland GmbH
> >>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>>>>>>>>>
> >>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>>>>>>>>>> m: +491603708037
> >>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
> >>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2.
> >>>>>>>>>>>>> I would prefer changing the return type of partitions() to
> >>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
> >>>>> broadcast
> >>>>>>>>>>>>> value. IMO, The chances that an implementation returns null
> due
> >>>>> to
> >>>>>> a
> >>>>>>>>>> bug
> >>>>>>>>>>>>> is much higher than that an implementation returns an empty
> >>>>>> Optional
> >>>>>>>>>> due
> >>>>>>>>>>>>> to a bug.
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Bruno Cadonna <ca...@apache.org>.
Hi Sagar,

Thanks for the update and the PR!

+1 (binding)

Best,
Bruno

On 10.09.22 18:57, Sagar wrote:
> Hi Bruno,
> 
> Thanks, I think these changes make sense to me. I have updated the KIP
> accordingly.
> 
> Thanks!
> Sagar.
> 
> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org> wrote:
> 
>> Hi Sagar,
>>
>> I would not drop the support for dropping records. I would also not
>> return null from partitions(). Maybe an Optional can help here. An empty
>> Optional would mean to use the default partitioning behavior of the
>> producer. So we would have:
>>
>> - non-empty Optional, non-empty list of integers: partitions to send the
>> record to
>> - non-empty Optional, empty list of integers: drop the record
>> - empty Optional: use default behavior
>>
>> What do other think?
>>
>> Best,
>> Bruno
>>
>> On 02.09.22 13:53, Sagar wrote:
>>> Hello Bruno/Chris,
>>>
>>> Since these are the last set of changes(I am assuming haha), it would be
>>> great if you could review the 2 options from above so that we can close
>> the
>>> voting. Of course I am happy to incorporate any other requisite changes.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com>
>> wrote:
>>>
>>>> Thanks Bruno for the great points.
>>>>
>>>> I see 2 options here =>
>>>>
>>>> 1) As Chris suggested, drop the support for dropping records in the
>>>> partitioner. That way, an empty list could signify the usage of a
>> default
>>>> partitioner. Also, if the deprecated partition() method returns null
>>>> thereby signifying the default partitioner, the partitions() can return
>> an
>>>> empty list i.e default partitioner.
>>>>
>>>> 2) OR we treat a null return type of partitions() method to signify the
>>>> usage of the default partitioner. In the default implementation of
>>>> partitions() method, if partition() returns null, then even partitions()
>>>> can return null(instead of an empty list). The RecordCollectorImpl code
>> can
>>>> also be modified accordingly. @Chris, to your point, we can even drop
>> the
>>>> support of dropping of records. It came up during KIP discussion, and I
>>>> thought it might be a useful feature. Let me know what you think.
>>>>
>>>> 3) Lastly about the partition number check. I wanted to avoid the
>> throwing
>>>> of exception so I thought adding it might be a useful feature. But as
>> you
>>>> pointed out, if it can break backwards compatibility, it's better to
>> remove
>>>> it.
>>>>
>>>> Thanks!
>>>> Sagar.
>>>>
>>>>
>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <ch...@aiven.io.invalid>
>>>> wrote:
>>>>
>>>>> +1 to Bruno's concerns about backward compatibility. Do we actually
>> need
>>>>> support for dropping records in the partitioner? It doesn't seem
>> necessary
>>>>> based on the motivation for the KIP. If we remove that feature, we
>> could
>>>>> handle null and/or empty lists by using the default partitioning,
>>>>> equivalent to how we handle null return values from the existing
>> partition
>>>>> method today.
>>>>>
>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <ca...@apache.org>
>> wrote:
>>>>>
>>>>>> Hi Sagar,
>>>>>>
>>>>>> Thank you for the updates!
>>>>>>
>>>>>> I do not intend to prolong this vote thread more than needed, but I
>>>>>> still have some points.
>>>>>>
>>>>>> The deprecated partition method can return null if the default
>>>>>> partitioning logic of the producer should be used.
>>>>>> With the new method partitions() it seems that it is not possible to
>> use
>>>>>> the default partitioning logic, anymore.
>>>>>>
>>>>>> Also, in the default implementation of method partitions(), a record
>>>>>> that would use the default partitioning logic in method partition()
>>>>>> would be dropped, which would break backward compatibility since
>> Streams
>>>>>> would always call the new method partitions() even though the users
>>>>>> still implement the deprecated method partition().
>>>>>>
>>>>>> I have a last point that we should probably discuss on the PR and not
>> on
>>>>>> the KIP but since you added the code in the KIP I need to mention it.
>> I
>>>>>> do not think you should check the validity of the partition number
>> since
>>>>>> the ProducerRecord does the same check and throws an exception. If
>>>>>> Streams adds the same check but does not throw, the behavior is not
>>>>>> backward compatible.
>>>>>>
>>>>>> Best,
>>>>>> Bruno
>>>>>>
>>>>>>
>>>>>> On 30.08.22 12:43, Sagar wrote:
>>>>>>> Thanks Bruno/Chris,
>>>>>>>
>>>>>>> Even I agree that might be better to keep it simple like the way
>> Chris
>>>>>>> suggested. I have updated the KIP accordingly. I made couple of minor
>>>>>>> changes to the KIP:
>>>>>>>
>>>>>>> 1) One of them being the change of return type of partitions method
>>>>> from
>>>>>>> List to Set. This is to ensure that in case the implementation of
>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
>>>>>>> partition numbers, we won't have duplicates thereby not trying to
>>>>> send to
>>>>>>> the same partition multiple times due to this.
>>>>>>> 2) I also added a check to send the record only to valid partition
>>>>>> numbers
>>>>>>> and log and drop when the partition number is invalid. This is again
>>>>> to
>>>>>>> prevent errors for cases when the StreamPartitioner implementation
>> has
>>>>>> some
>>>>>>> bugs (since there are no validations as such).
>>>>>>> 3) I also updated the Test Plan section based on the suggestion from
>>>>>> Bruno.
>>>>>>> 4) I updated the default implementation of partitions method based on
>>>>> the
>>>>>>> great catch from Chris!
>>>>>>>
>>>>>>> Let me know if it looks fine now.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> Sagar.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am favour of discarding the sugar for broadcasting and leave the
>>>>>>>> broadcasting to the implementation as Chris suggests. I think that
>> is
>>>>>>>> the cleanest option.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Bruno
>>>>>>>>
>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I think it'd be useful to be more explicit about broadcasting to
>> all
>>>>>>>> topic
>>>>>>>>> partitions rather than add implicit behavior for empty cases (empty
>>>>>>>>> optional, empty list, etc.). The suggested enum approach would
>>>>> address
>>>>>>>> that
>>>>>>>>> nicely.
>>>>>>>>>
>>>>>>>>> It's also worth noting that there's no hard requirement to add
>> sugar
>>>>>> for
>>>>>>>>> broadcasting to all topic partitions since the API already provides
>>>>> the
>>>>>>>>> number of topic partitions available when calling a stream
>>>>> partitioner.
>>>>>>>> If
>>>>>>>>> we can't find a clean way to add this support, it might be better
>> to
>>>>>>>> leave
>>>>>>>>> it out and just let people implement this themselves with a line of
>>>>>> Java
>>>>>>>> 8
>>>>>>>>> streams:
>>>>>>>>>
>>>>>>>>>         return IntStream.range(0,
>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>>>>>>>>>
>>>>>>>>> Also worth noting that there may be a bug in the default
>>>>> implementation
>>>>>>>> for
>>>>>>>>> the new StreamPartitioner::partitions method, since it doesn't
>>>>> appear
>>>>>> to
>>>>>>>>> correctly pick up on null return values from the partition method
>>>>> and
>>>>>>>>> translate them into empty lists.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Chris
>>>>>>>>>
>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Sagar,
>>>>>>>>>>
>>>>>>>>>> I do not see an issue with using an empty list together with an
>>>>> empty
>>>>>>>>>> Optional. A non-empty Optional that contains an empty list would
>>>>> just
>>>>>>>>>> say that there is no partition to which the record should be sent.
>>>>> If
>>>>>>>>>> there is no partition, the record is dropped.
>>>>>>>>>>
>>>>>>>>>> An empty Optional might be a way to say, broadcast to all
>>>>> partitions.
>>>>>>>>>>
>>>>>>>>>> Alternatively -- to make it more explicit -- you could return an
>>>>>> object
>>>>>>>>>> with an enum and a list of partitions. The enum could have values
>>>>>> SOME,
>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
>>>>> contains
>>>>>> the
>>>>>>>>>> partitions to which to send the record. If the value of the enum
>> is
>>>>>> ALL
>>>>>>>>>> or NONE, the list of partitions is not used and might be even null
>>>>>>>>>> (since in that case the list should not be used and it would be a
>>>>> bug
>>>>>> to
>>>>>>>>>> use it).
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Bruno
>>>>>>>>>>
>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
>>>>>>>>>>>
>>>>>>>>>>> I agree using null does seem error prone. However I think using a
>>>>>>>>>> singleton
>>>>>>>>>>> list of [-1] might be better in terms of usability, I am saying
>>>>> this
>>>>>>>>>>> because the KIP also has a provision to return an empty list to
>>>>> refer
>>>>>>>> to
>>>>>>>>>>> dropping the record. So, an empty optional and an empty list have
>>>>>>>> totally
>>>>>>>>>>> different meanings which could get confusing.
>>>>>>>>>>>
>>>>>>>>>>> Let me know what you think.
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Sagar.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>>>>>>>>>>> <ma...@aiven.io.invalid> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I also concur with this, having an Optional in the type makes it
>>>>>> very
>>>>>>>>>>>> clear what’s going on and better signifies an absence of value
>>>>> (or
>>>>>> in
>>>>>>>>>> this
>>>>>>>>>>>> case the broadcast value).
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Matthew de Detrich
>>>>>>>>>>>> Aiven Deutschland GmbH
>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>>>>>>>>>>>>
>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>>>>>>>>>>> m: +491603708037
>>>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2.
>>>>>>>>>>>>> I would prefer changing the return type of partitions() to
>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
>>>>> broadcast
>>>>>>>>>>>>> value. IMO, The chances that an implementation returns null due
>>>>> to
>>>>>> a
>>>>>>>>>> bug
>>>>>>>>>>>>> is much higher than that an implementation returns an empty
>>>>>> Optional
>>>>>>>>>> due
>>>>>>>>>>>>> to a bug.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Hi Bruno,

Thanks, I think these changes make sense to me. I have updated the KIP
accordingly.

Thanks!
Sagar.

On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <ca...@apache.org> wrote:

> Hi Sagar,
>
> I would not drop the support for dropping records. I would also not
> return null from partitions(). Maybe an Optional can help here. An empty
> Optional would mean to use the default partitioning behavior of the
> producer. So we would have:
>
> - non-empty Optional, non-empty list of integers: partitions to send the
> record to
> - non-empty Optional, empty list of integers: drop the record
> - empty Optional: use default behavior
>
> What do other think?
>
> Best,
> Bruno
>
> On 02.09.22 13:53, Sagar wrote:
> > Hello Bruno/Chris,
> >
> > Since these are the last set of changes(I am assuming haha), it would be
> > great if you could review the 2 options from above so that we can close
> the
> > voting. Of course I am happy to incorporate any other requisite changes.
> >
> > Thanks!
> > Sagar.
> >
> > On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com>
> wrote:
> >
> >> Thanks Bruno for the great points.
> >>
> >> I see 2 options here =>
> >>
> >> 1) As Chris suggested, drop the support for dropping records in the
> >> partitioner. That way, an empty list could signify the usage of a
> default
> >> partitioner. Also, if the deprecated partition() method returns null
> >> thereby signifying the default partitioner, the partitions() can return
> an
> >> empty list i.e default partitioner.
> >>
> >> 2) OR we treat a null return type of partitions() method to signify the
> >> usage of the default partitioner. In the default implementation of
> >> partitions() method, if partition() returns null, then even partitions()
> >> can return null(instead of an empty list). The RecordCollectorImpl code
> can
> >> also be modified accordingly. @Chris, to your point, we can even drop
> the
> >> support of dropping of records. It came up during KIP discussion, and I
> >> thought it might be a useful feature. Let me know what you think.
> >>
> >> 3) Lastly about the partition number check. I wanted to avoid the
> throwing
> >> of exception so I thought adding it might be a useful feature. But as
> you
> >> pointed out, if it can break backwards compatibility, it's better to
> remove
> >> it.
> >>
> >> Thanks!
> >> Sagar.
> >>
> >>
> >> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <ch...@aiven.io.invalid>
> >> wrote:
> >>
> >>> +1 to Bruno's concerns about backward compatibility. Do we actually
> need
> >>> support for dropping records in the partitioner? It doesn't seem
> necessary
> >>> based on the motivation for the KIP. If we remove that feature, we
> could
> >>> handle null and/or empty lists by using the default partitioning,
> >>> equivalent to how we handle null return values from the existing
> partition
> >>> method today.
> >>>
> >>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <ca...@apache.org>
> wrote:
> >>>
> >>>> Hi Sagar,
> >>>>
> >>>> Thank you for the updates!
> >>>>
> >>>> I do not intend to prolong this vote thread more than needed, but I
> >>>> still have some points.
> >>>>
> >>>> The deprecated partition method can return null if the default
> >>>> partitioning logic of the producer should be used.
> >>>> With the new method partitions() it seems that it is not possible to
> use
> >>>> the default partitioning logic, anymore.
> >>>>
> >>>> Also, in the default implementation of method partitions(), a record
> >>>> that would use the default partitioning logic in method partition()
> >>>> would be dropped, which would break backward compatibility since
> Streams
> >>>> would always call the new method partitions() even though the users
> >>>> still implement the deprecated method partition().
> >>>>
> >>>> I have a last point that we should probably discuss on the PR and not
> on
> >>>> the KIP but since you added the code in the KIP I need to mention it.
> I
> >>>> do not think you should check the validity of the partition number
> since
> >>>> the ProducerRecord does the same check and throws an exception. If
> >>>> Streams adds the same check but does not throw, the behavior is not
> >>>> backward compatible.
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>>
> >>>> On 30.08.22 12:43, Sagar wrote:
> >>>>> Thanks Bruno/Chris,
> >>>>>
> >>>>> Even I agree that might be better to keep it simple like the way
> Chris
> >>>>> suggested. I have updated the KIP accordingly. I made couple of minor
> >>>>> changes to the KIP:
> >>>>>
> >>>>> 1) One of them being the change of return type of partitions method
> >>> from
> >>>>> List to Set. This is to ensure that in case the implementation of
> >>>>> StreamPartitoner is buggy and ends up returning duplicate
> >>>>> partition numbers, we won't have duplicates thereby not trying to
> >>> send to
> >>>>> the same partition multiple times due to this.
> >>>>> 2) I also added a check to send the record only to valid partition
> >>>> numbers
> >>>>> and log and drop when the partition number is invalid. This is again
> >>> to
> >>>>> prevent errors for cases when the StreamPartitioner implementation
> has
> >>>> some
> >>>>> bugs (since there are no validations as such).
> >>>>> 3) I also updated the Test Plan section based on the suggestion from
> >>>> Bruno.
> >>>>> 4) I updated the default implementation of partitions method based on
> >>> the
> >>>>> great catch from Chris!
> >>>>>
> >>>>> Let me know if it looks fine now.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>>
> >>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I am favour of discarding the sugar for broadcasting and leave the
> >>>>>> broadcasting to the implementation as Chris suggests. I think that
> is
> >>>>>> the cleanest option.
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 29.08.22 19:50, Chris Egerton wrote:
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> I think it'd be useful to be more explicit about broadcasting to
> all
> >>>>>> topic
> >>>>>>> partitions rather than add implicit behavior for empty cases (empty
> >>>>>>> optional, empty list, etc.). The suggested enum approach would
> >>> address
> >>>>>> that
> >>>>>>> nicely.
> >>>>>>>
> >>>>>>> It's also worth noting that there's no hard requirement to add
> sugar
> >>>> for
> >>>>>>> broadcasting to all topic partitions since the API already provides
> >>> the
> >>>>>>> number of topic partitions available when calling a stream
> >>> partitioner.
> >>>>>> If
> >>>>>>> we can't find a clean way to add this support, it might be better
> to
> >>>>>> leave
> >>>>>>> it out and just let people implement this themselves with a line of
> >>>> Java
> >>>>>> 8
> >>>>>>> streams:
> >>>>>>>
> >>>>>>>        return IntStream.range(0,
> >>>>>>> numPartitions).boxed().collect(Collectors.toList());
> >>>>>>>
> >>>>>>> Also worth noting that there may be a bug in the default
> >>> implementation
> >>>>>> for
> >>>>>>> the new StreamPartitioner::partitions method, since it doesn't
> >>> appear
> >>>> to
> >>>>>>> correctly pick up on null return values from the partition method
> >>> and
> >>>>>>> translate them into empty lists.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>>
> >>>>>>> Chris
> >>>>>>>
> >>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Sagar,
> >>>>>>>>
> >>>>>>>> I do not see an issue with using an empty list together with an
> >>> empty
> >>>>>>>> Optional. A non-empty Optional that contains an empty list would
> >>> just
> >>>>>>>> say that there is no partition to which the record should be sent.
> >>> If
> >>>>>>>> there is no partition, the record is dropped.
> >>>>>>>>
> >>>>>>>> An empty Optional might be a way to say, broadcast to all
> >>> partitions.
> >>>>>>>>
> >>>>>>>> Alternatively -- to make it more explicit -- you could return an
> >>>> object
> >>>>>>>> with an enum and a list of partitions. The enum could have values
> >>>> SOME,
> >>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
> >>> contains
> >>>> the
> >>>>>>>> partitions to which to send the record. If the value of the enum
> is
> >>>> ALL
> >>>>>>>> or NONE, the list of partitions is not used and might be even null
> >>>>>>>> (since in that case the list should not be used and it would be a
> >>> bug
> >>>> to
> >>>>>>>> use it).
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>> On 24.08.22 20:11, Sagar wrote:
> >>>>>>>>> Thank you Bruno/Matthew for your comments.
> >>>>>>>>>
> >>>>>>>>> I agree using null does seem error prone. However I think using a
> >>>>>>>> singleton
> >>>>>>>>> list of [-1] might be better in terms of usability, I am saying
> >>> this
> >>>>>>>>> because the KIP also has a provision to return an empty list to
> >>> refer
> >>>>>> to
> >>>>>>>>> dropping the record. So, an empty optional and an empty list have
> >>>>>> totally
> >>>>>>>>> different meanings which could get confusing.
> >>>>>>>>>
> >>>>>>>>> Let me know what you think.
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> Sagar.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> >>>>>>>>> <ma...@aiven.io.invalid> wrote:
> >>>>>>>>>
> >>>>>>>>>> I also concur with this, having an Optional in the type makes it
> >>>> very
> >>>>>>>>>> clear what’s going on and better signifies an absence of value
> >>> (or
> >>>> in
> >>>>>>>> this
> >>>>>>>>>> case the broadcast value).
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Matthew de Detrich
> >>>>>>>>>> Aiven Deutschland GmbH
> >>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>>>>>>>
> >>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>>>>>>>> m: +491603708037
> >>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
> >>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> 2.
> >>>>>>>>>>> I would prefer changing the return type of partitions() to
> >>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
> >>> broadcast
> >>>>>>>>>>> value. IMO, The chances that an implementation returns null due
> >>> to
> >>>> a
> >>>>>>>> bug
> >>>>>>>>>>> is much higher than that an implementation returns an empty
> >>>> Optional
> >>>>>>>> due
> >>>>>>>>>>> to a bug.
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Bruno Cadonna <ca...@apache.org>.
Hi Sagar,

I would not drop the support for dropping records. I would also not 
return null from partitions(). Maybe an Optional can help here. An empty 
Optional would mean to use the default partitioning behavior of the 
producer. So we would have:

- non-empty Optional, non-empty list of integers: partitions to send the 
record to
- non-empty Optional, empty list of integers: drop the record
- empty Optional: use default behavior

What do other think?

Best,
Bruno

On 02.09.22 13:53, Sagar wrote:
> Hello Bruno/Chris,
> 
> Since these are the last set of changes(I am assuming haha), it would be
> great if you could review the 2 options from above so that we can close the
> voting. Of course I am happy to incorporate any other requisite changes.
> 
> Thanks!
> Sagar.
> 
> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com> wrote:
> 
>> Thanks Bruno for the great points.
>>
>> I see 2 options here =>
>>
>> 1) As Chris suggested, drop the support for dropping records in the
>> partitioner. That way, an empty list could signify the usage of a default
>> partitioner. Also, if the deprecated partition() method returns null
>> thereby signifying the default partitioner, the partitions() can return an
>> empty list i.e default partitioner.
>>
>> 2) OR we treat a null return type of partitions() method to signify the
>> usage of the default partitioner. In the default implementation of
>> partitions() method, if partition() returns null, then even partitions()
>> can return null(instead of an empty list). The RecordCollectorImpl code can
>> also be modified accordingly. @Chris, to your point, we can even drop the
>> support of dropping of records. It came up during KIP discussion, and I
>> thought it might be a useful feature. Let me know what you think.
>>
>> 3) Lastly about the partition number check. I wanted to avoid the throwing
>> of exception so I thought adding it might be a useful feature. But as you
>> pointed out, if it can break backwards compatibility, it's better to remove
>> it.
>>
>> Thanks!
>> Sagar.
>>
>>
>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <ch...@aiven.io.invalid>
>> wrote:
>>
>>> +1 to Bruno's concerns about backward compatibility. Do we actually need
>>> support for dropping records in the partitioner? It doesn't seem necessary
>>> based on the motivation for the KIP. If we remove that feature, we could
>>> handle null and/or empty lists by using the default partitioning,
>>> equivalent to how we handle null return values from the existing partition
>>> method today.
>>>
>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <ca...@apache.org> wrote:
>>>
>>>> Hi Sagar,
>>>>
>>>> Thank you for the updates!
>>>>
>>>> I do not intend to prolong this vote thread more than needed, but I
>>>> still have some points.
>>>>
>>>> The deprecated partition method can return null if the default
>>>> partitioning logic of the producer should be used.
>>>> With the new method partitions() it seems that it is not possible to use
>>>> the default partitioning logic, anymore.
>>>>
>>>> Also, in the default implementation of method partitions(), a record
>>>> that would use the default partitioning logic in method partition()
>>>> would be dropped, which would break backward compatibility since Streams
>>>> would always call the new method partitions() even though the users
>>>> still implement the deprecated method partition().
>>>>
>>>> I have a last point that we should probably discuss on the PR and not on
>>>> the KIP but since you added the code in the KIP I need to mention it. I
>>>> do not think you should check the validity of the partition number since
>>>> the ProducerRecord does the same check and throws an exception. If
>>>> Streams adds the same check but does not throw, the behavior is not
>>>> backward compatible.
>>>>
>>>> Best,
>>>> Bruno
>>>>
>>>>
>>>> On 30.08.22 12:43, Sagar wrote:
>>>>> Thanks Bruno/Chris,
>>>>>
>>>>> Even I agree that might be better to keep it simple like the way Chris
>>>>> suggested. I have updated the KIP accordingly. I made couple of minor
>>>>> changes to the KIP:
>>>>>
>>>>> 1) One of them being the change of return type of partitions method
>>> from
>>>>> List to Set. This is to ensure that in case the implementation of
>>>>> StreamPartitoner is buggy and ends up returning duplicate
>>>>> partition numbers, we won't have duplicates thereby not trying to
>>> send to
>>>>> the same partition multiple times due to this.
>>>>> 2) I also added a check to send the record only to valid partition
>>>> numbers
>>>>> and log and drop when the partition number is invalid. This is again
>>> to
>>>>> prevent errors for cases when the StreamPartitioner implementation has
>>>> some
>>>>> bugs (since there are no validations as such).
>>>>> 3) I also updated the Test Plan section based on the suggestion from
>>>> Bruno.
>>>>> 4) I updated the default implementation of partitions method based on
>>> the
>>>>> great catch from Chris!
>>>>>
>>>>> Let me know if it looks fine now.
>>>>>
>>>>> Thanks!
>>>>> Sagar.
>>>>>
>>>>>
>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org>
>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am favour of discarding the sugar for broadcasting and leave the
>>>>>> broadcasting to the implementation as Chris suggests. I think that is
>>>>>> the cleanest option.
>>>>>>
>>>>>> Best,
>>>>>> Bruno
>>>>>>
>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I think it'd be useful to be more explicit about broadcasting to all
>>>>>> topic
>>>>>>> partitions rather than add implicit behavior for empty cases (empty
>>>>>>> optional, empty list, etc.). The suggested enum approach would
>>> address
>>>>>> that
>>>>>>> nicely.
>>>>>>>
>>>>>>> It's also worth noting that there's no hard requirement to add sugar
>>>> for
>>>>>>> broadcasting to all topic partitions since the API already provides
>>> the
>>>>>>> number of topic partitions available when calling a stream
>>> partitioner.
>>>>>> If
>>>>>>> we can't find a clean way to add this support, it might be better to
>>>>>> leave
>>>>>>> it out and just let people implement this themselves with a line of
>>>> Java
>>>>>> 8
>>>>>>> streams:
>>>>>>>
>>>>>>>        return IntStream.range(0,
>>>>>>> numPartitions).boxed().collect(Collectors.toList());
>>>>>>>
>>>>>>> Also worth noting that there may be a bug in the default
>>> implementation
>>>>>> for
>>>>>>> the new StreamPartitioner::partitions method, since it doesn't
>>> appear
>>>> to
>>>>>>> correctly pick up on null return values from the partition method
>>> and
>>>>>>> translate them into empty lists.
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Sagar,
>>>>>>>>
>>>>>>>> I do not see an issue with using an empty list together with an
>>> empty
>>>>>>>> Optional. A non-empty Optional that contains an empty list would
>>> just
>>>>>>>> say that there is no partition to which the record should be sent.
>>> If
>>>>>>>> there is no partition, the record is dropped.
>>>>>>>>
>>>>>>>> An empty Optional might be a way to say, broadcast to all
>>> partitions.
>>>>>>>>
>>>>>>>> Alternatively -- to make it more explicit -- you could return an
>>>> object
>>>>>>>> with an enum and a list of partitions. The enum could have values
>>>> SOME,
>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
>>> contains
>>>> the
>>>>>>>> partitions to which to send the record. If the value of the enum is
>>>> ALL
>>>>>>>> or NONE, the list of partitions is not used and might be even null
>>>>>>>> (since in that case the list should not be used and it would be a
>>> bug
>>>> to
>>>>>>>> use it).
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Bruno
>>>>>>>>
>>>>>>>> On 24.08.22 20:11, Sagar wrote:
>>>>>>>>> Thank you Bruno/Matthew for your comments.
>>>>>>>>>
>>>>>>>>> I agree using null does seem error prone. However I think using a
>>>>>>>> singleton
>>>>>>>>> list of [-1] might be better in terms of usability, I am saying
>>> this
>>>>>>>>> because the KIP also has a provision to return an empty list to
>>> refer
>>>>>> to
>>>>>>>>> dropping the record. So, an empty optional and an empty list have
>>>>>> totally
>>>>>>>>> different meanings which could get confusing.
>>>>>>>>>
>>>>>>>>> Let me know what you think.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Sagar.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>>>>>>>>> <ma...@aiven.io.invalid> wrote:
>>>>>>>>>
>>>>>>>>>> I also concur with this, having an Optional in the type makes it
>>>> very
>>>>>>>>>> clear what’s going on and better signifies an absence of value
>>> (or
>>>> in
>>>>>>>> this
>>>>>>>>>> case the broadcast value).
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Matthew de Detrich
>>>>>>>>>> Aiven Deutschland GmbH
>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>>>>>>>>>>
>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>>>>>>>>> m: +491603708037
>>>>>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>>>>>>>>>>>
>>>>>>>>>>> 2.
>>>>>>>>>>> I would prefer changing the return type of partitions() to
>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
>>> broadcast
>>>>>>>>>>> value. IMO, The chances that an implementation returns null due
>>> to
>>>> a
>>>>>>>> bug
>>>>>>>>>>> is much higher than that an implementation returns an empty
>>>> Optional
>>>>>>>> due
>>>>>>>>>>> to a bug.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Hello Bruno/Chris,

Since these are the last set of changes(I am assuming haha), it would be
great if you could review the 2 options from above so that we can close the
voting. Of course I am happy to incorporate any other requisite changes.

Thanks!
Sagar.

On Wed, Aug 31, 2022 at 10:07 PM Sagar <sa...@gmail.com> wrote:

> Thanks Bruno for the great points.
>
> I see 2 options here =>
>
> 1) As Chris suggested, drop the support for dropping records in the
> partitioner. That way, an empty list could signify the usage of a default
> partitioner. Also, if the deprecated partition() method returns null
> thereby signifying the default partitioner, the partitions() can return an
> empty list i.e default partitioner.
>
> 2) OR we treat a null return type of partitions() method to signify the
> usage of the default partitioner. In the default implementation of
> partitions() method, if partition() returns null, then even partitions()
> can return null(instead of an empty list). The RecordCollectorImpl code can
> also be modified accordingly. @Chris, to your point, we can even drop the
> support of dropping of records. It came up during KIP discussion, and I
> thought it might be a useful feature. Let me know what you think.
>
> 3) Lastly about the partition number check. I wanted to avoid the throwing
> of exception so I thought adding it might be a useful feature. But as you
> pointed out, if it can break backwards compatibility, it's better to remove
> it.
>
> Thanks!
> Sagar.
>
>
> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <ch...@aiven.io.invalid>
> wrote:
>
>> +1 to Bruno's concerns about backward compatibility. Do we actually need
>> support for dropping records in the partitioner? It doesn't seem necessary
>> based on the motivation for the KIP. If we remove that feature, we could
>> handle null and/or empty lists by using the default partitioning,
>> equivalent to how we handle null return values from the existing partition
>> method today.
>>
>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <ca...@apache.org> wrote:
>>
>> > Hi Sagar,
>> >
>> > Thank you for the updates!
>> >
>> > I do not intend to prolong this vote thread more than needed, but I
>> > still have some points.
>> >
>> > The deprecated partition method can return null if the default
>> > partitioning logic of the producer should be used.
>> > With the new method partitions() it seems that it is not possible to use
>> > the default partitioning logic, anymore.
>> >
>> > Also, in the default implementation of method partitions(), a record
>> > that would use the default partitioning logic in method partition()
>> > would be dropped, which would break backward compatibility since Streams
>> > would always call the new method partitions() even though the users
>> > still implement the deprecated method partition().
>> >
>> > I have a last point that we should probably discuss on the PR and not on
>> > the KIP but since you added the code in the KIP I need to mention it. I
>> > do not think you should check the validity of the partition number since
>> > the ProducerRecord does the same check and throws an exception. If
>> > Streams adds the same check but does not throw, the behavior is not
>> > backward compatible.
>> >
>> > Best,
>> > Bruno
>> >
>> >
>> > On 30.08.22 12:43, Sagar wrote:
>> > > Thanks Bruno/Chris,
>> > >
>> > > Even I agree that might be better to keep it simple like the way Chris
>> > > suggested. I have updated the KIP accordingly. I made couple of minor
>> > > changes to the KIP:
>> > >
>> > > 1) One of them being the change of return type of partitions method
>> from
>> > > List to Set. This is to ensure that in case the implementation of
>> > > StreamPartitoner is buggy and ends up returning duplicate
>> > > partition numbers, we won't have duplicates thereby not trying to
>> send to
>> > > the same partition multiple times due to this.
>> > > 2) I also added a check to send the record only to valid partition
>> > numbers
>> > > and log and drop when the partition number is invalid. This is again
>> to
>> > > prevent errors for cases when the StreamPartitioner implementation has
>> > some
>> > > bugs (since there are no validations as such).
>> > > 3) I also updated the Test Plan section based on the suggestion from
>> > Bruno.
>> > > 4) I updated the default implementation of partitions method based on
>> the
>> > > great catch from Chris!
>> > >
>> > > Let me know if it looks fine now.
>> > >
>> > > Thanks!
>> > > Sagar.
>> > >
>> > >
>> > > On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org>
>> > wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> I am favour of discarding the sugar for broadcasting and leave the
>> > >> broadcasting to the implementation as Chris suggests. I think that is
>> > >> the cleanest option.
>> > >>
>> > >> Best,
>> > >> Bruno
>> > >>
>> > >> On 29.08.22 19:50, Chris Egerton wrote:
>> > >>> Hi all,
>> > >>>
>> > >>> I think it'd be useful to be more explicit about broadcasting to all
>> > >> topic
>> > >>> partitions rather than add implicit behavior for empty cases (empty
>> > >>> optional, empty list, etc.). The suggested enum approach would
>> address
>> > >> that
>> > >>> nicely.
>> > >>>
>> > >>> It's also worth noting that there's no hard requirement to add sugar
>> > for
>> > >>> broadcasting to all topic partitions since the API already provides
>> the
>> > >>> number of topic partitions available when calling a stream
>> partitioner.
>> > >> If
>> > >>> we can't find a clean way to add this support, it might be better to
>> > >> leave
>> > >>> it out and just let people implement this themselves with a line of
>> > Java
>> > >> 8
>> > >>> streams:
>> > >>>
>> > >>>       return IntStream.range(0,
>> > >>> numPartitions).boxed().collect(Collectors.toList());
>> > >>>
>> > >>> Also worth noting that there may be a bug in the default
>> implementation
>> > >> for
>> > >>> the new StreamPartitioner::partitions method, since it doesn't
>> appear
>> > to
>> > >>> correctly pick up on null return values from the partition method
>> and
>> > >>> translate them into empty lists.
>> > >>>
>> > >>> Cheers,
>> > >>>
>> > >>> Chris
>> > >>>
>> > >>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org>
>> > >> wrote:
>> > >>>
>> > >>>> Hi Sagar,
>> > >>>>
>> > >>>> I do not see an issue with using an empty list together with an
>> empty
>> > >>>> Optional. A non-empty Optional that contains an empty list would
>> just
>> > >>>> say that there is no partition to which the record should be sent.
>> If
>> > >>>> there is no partition, the record is dropped.
>> > >>>>
>> > >>>> An empty Optional might be a way to say, broadcast to all
>> partitions.
>> > >>>>
>> > >>>> Alternatively -- to make it more explicit -- you could return an
>> > object
>> > >>>> with an enum and a list of partitions. The enum could have values
>> > SOME,
>> > >>>> ALL, and NONE. If the value is SOME, the list of partitions
>> contains
>> > the
>> > >>>> partitions to which to send the record. If the value of the enum is
>> > ALL
>> > >>>> or NONE, the list of partitions is not used and might be even null
>> > >>>> (since in that case the list should not be used and it would be a
>> bug
>> > to
>> > >>>> use it).
>> > >>>>
>> > >>>> Best,
>> > >>>> Bruno
>> > >>>>
>> > >>>> On 24.08.22 20:11, Sagar wrote:
>> > >>>>> Thank you Bruno/Matthew for your comments.
>> > >>>>>
>> > >>>>> I agree using null does seem error prone. However I think using a
>> > >>>> singleton
>> > >>>>> list of [-1] might be better in terms of usability, I am saying
>> this
>> > >>>>> because the KIP also has a provision to return an empty list to
>> refer
>> > >> to
>> > >>>>> dropping the record. So, an empty optional and an empty list have
>> > >> totally
>> > >>>>> different meanings which could get confusing.
>> > >>>>>
>> > >>>>> Let me know what you think.
>> > >>>>>
>> > >>>>> Thanks!
>> > >>>>> Sagar.
>> > >>>>>
>> > >>>>>
>> > >>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>> > >>>>> <ma...@aiven.io.invalid> wrote:
>> > >>>>>
>> > >>>>>> I also concur with this, having an Optional in the type makes it
>> > very
>> > >>>>>> clear what’s going on and better signifies an absence of value
>> (or
>> > in
>> > >>>> this
>> > >>>>>> case the broadcast value).
>> > >>>>>>
>> > >>>>>> --
>> > >>>>>> Matthew de Detrich
>> > >>>>>> Aiven Deutschland GmbH
>> > >>>>>> Immanuelkirchstraße 26, 10405 Berlin
>> > >>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>> > >>>>>>
>> > >>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> > >>>>>> m: +491603708037
>> > >>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>> > >>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>> > >>>>>>>
>> > >>>>>>> 2.
>> > >>>>>>> I would prefer changing the return type of partitions() to
>> > >>>>>>> Optional<List<Integer>> and using Optional.empty() as the
>> broadcast
>> > >>>>>>> value. IMO, The chances that an implementation returns null due
>> to
>> > a
>> > >>>> bug
>> > >>>>>>> is much higher than that an implementation returns an empty
>> > Optional
>> > >>>> due
>> > >>>>>>> to a bug.
>> > >>>>>>
>> > >>>>>
>> > >>>>
>> > >>>
>> > >>
>> > >
>> >
>>
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Thanks Bruno for the great points.

I see 2 options here =>

1) As Chris suggested, drop the support for dropping records in the
partitioner. That way, an empty list could signify the usage of a default
partitioner. Also, if the deprecated partition() method returns null
thereby signifying the default partitioner, the partitions() can return an
empty list i.e default partitioner.

2) OR we treat a null return type of partitions() method to signify the
usage of the default partitioner. In the default implementation of
partitions() method, if partition() returns null, then even partitions()
can return null(instead of an empty list). The RecordCollectorImpl code can
also be modified accordingly. @Chris, to your point, we can even drop the
support of dropping of records. It came up during KIP discussion, and I
thought it might be a useful feature. Let me know what you think.

3) Lastly about the partition number check. I wanted to avoid the throwing
of exception so I thought adding it might be a useful feature. But as you
pointed out, if it can break backwards compatibility, it's better to remove
it.

Thanks!
Sagar.


On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <ch...@aiven.io.invalid>
wrote:

> +1 to Bruno's concerns about backward compatibility. Do we actually need
> support for dropping records in the partitioner? It doesn't seem necessary
> based on the motivation for the KIP. If we remove that feature, we could
> handle null and/or empty lists by using the default partitioning,
> equivalent to how we handle null return values from the existing partition
> method today.
>
> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <ca...@apache.org> wrote:
>
> > Hi Sagar,
> >
> > Thank you for the updates!
> >
> > I do not intend to prolong this vote thread more than needed, but I
> > still have some points.
> >
> > The deprecated partition method can return null if the default
> > partitioning logic of the producer should be used.
> > With the new method partitions() it seems that it is not possible to use
> > the default partitioning logic, anymore.
> >
> > Also, in the default implementation of method partitions(), a record
> > that would use the default partitioning logic in method partition()
> > would be dropped, which would break backward compatibility since Streams
> > would always call the new method partitions() even though the users
> > still implement the deprecated method partition().
> >
> > I have a last point that we should probably discuss on the PR and not on
> > the KIP but since you added the code in the KIP I need to mention it. I
> > do not think you should check the validity of the partition number since
> > the ProducerRecord does the same check and throws an exception. If
> > Streams adds the same check but does not throw, the behavior is not
> > backward compatible.
> >
> > Best,
> > Bruno
> >
> >
> > On 30.08.22 12:43, Sagar wrote:
> > > Thanks Bruno/Chris,
> > >
> > > Even I agree that might be better to keep it simple like the way Chris
> > > suggested. I have updated the KIP accordingly. I made couple of minor
> > > changes to the KIP:
> > >
> > > 1) One of them being the change of return type of partitions method
> from
> > > List to Set. This is to ensure that in case the implementation of
> > > StreamPartitoner is buggy and ends up returning duplicate
> > > partition numbers, we won't have duplicates thereby not trying to send
> to
> > > the same partition multiple times due to this.
> > > 2) I also added a check to send the record only to valid partition
> > numbers
> > > and log and drop when the partition number is invalid. This is again to
> > > prevent errors for cases when the StreamPartitioner implementation has
> > some
> > > bugs (since there are no validations as such).
> > > 3) I also updated the Test Plan section based on the suggestion from
> > Bruno.
> > > 4) I updated the default implementation of partitions method based on
> the
> > > great catch from Chris!
> > >
> > > Let me know if it looks fine now.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org>
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> I am favour of discarding the sugar for broadcasting and leave the
> > >> broadcasting to the implementation as Chris suggests. I think that is
> > >> the cleanest option.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On 29.08.22 19:50, Chris Egerton wrote:
> > >>> Hi all,
> > >>>
> > >>> I think it'd be useful to be more explicit about broadcasting to all
> > >> topic
> > >>> partitions rather than add implicit behavior for empty cases (empty
> > >>> optional, empty list, etc.). The suggested enum approach would
> address
> > >> that
> > >>> nicely.
> > >>>
> > >>> It's also worth noting that there's no hard requirement to add sugar
> > for
> > >>> broadcasting to all topic partitions since the API already provides
> the
> > >>> number of topic partitions available when calling a stream
> partitioner.
> > >> If
> > >>> we can't find a clean way to add this support, it might be better to
> > >> leave
> > >>> it out and just let people implement this themselves with a line of
> > Java
> > >> 8
> > >>> streams:
> > >>>
> > >>>       return IntStream.range(0,
> > >>> numPartitions).boxed().collect(Collectors.toList());
> > >>>
> > >>> Also worth noting that there may be a bug in the default
> implementation
> > >> for
> > >>> the new StreamPartitioner::partitions method, since it doesn't appear
> > to
> > >>> correctly pick up on null return values from the partition method and
> > >>> translate them into empty lists.
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Chris
> > >>>
> > >>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org>
> > >> wrote:
> > >>>
> > >>>> Hi Sagar,
> > >>>>
> > >>>> I do not see an issue with using an empty list together with an
> empty
> > >>>> Optional. A non-empty Optional that contains an empty list would
> just
> > >>>> say that there is no partition to which the record should be sent.
> If
> > >>>> there is no partition, the record is dropped.
> > >>>>
> > >>>> An empty Optional might be a way to say, broadcast to all
> partitions.
> > >>>>
> > >>>> Alternatively -- to make it more explicit -- you could return an
> > object
> > >>>> with an enum and a list of partitions. The enum could have values
> > SOME,
> > >>>> ALL, and NONE. If the value is SOME, the list of partitions contains
> > the
> > >>>> partitions to which to send the record. If the value of the enum is
> > ALL
> > >>>> or NONE, the list of partitions is not used and might be even null
> > >>>> (since in that case the list should not be used and it would be a
> bug
> > to
> > >>>> use it).
> > >>>>
> > >>>> Best,
> > >>>> Bruno
> > >>>>
> > >>>> On 24.08.22 20:11, Sagar wrote:
> > >>>>> Thank you Bruno/Matthew for your comments.
> > >>>>>
> > >>>>> I agree using null does seem error prone. However I think using a
> > >>>> singleton
> > >>>>> list of [-1] might be better in terms of usability, I am saying
> this
> > >>>>> because the KIP also has a provision to return an empty list to
> refer
> > >> to
> > >>>>> dropping the record. So, an empty optional and an empty list have
> > >> totally
> > >>>>> different meanings which could get confusing.
> > >>>>>
> > >>>>> Let me know what you think.
> > >>>>>
> > >>>>> Thanks!
> > >>>>> Sagar.
> > >>>>>
> > >>>>>
> > >>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> > >>>>> <ma...@aiven.io.invalid> wrote:
> > >>>>>
> > >>>>>> I also concur with this, having an Optional in the type makes it
> > very
> > >>>>>> clear what’s going on and better signifies an absence of value (or
> > in
> > >>>> this
> > >>>>>> case the broadcast value).
> > >>>>>>
> > >>>>>> --
> > >>>>>> Matthew de Detrich
> > >>>>>> Aiven Deutschland GmbH
> > >>>>>> Immanuelkirchstraße 26, 10405 Berlin
> > >>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> > >>>>>>
> > >>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > >>>>>> m: +491603708037
> > >>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
> > >>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> > >>>>>>>
> > >>>>>>> 2.
> > >>>>>>> I would prefer changing the return type of partitions() to
> > >>>>>>> Optional<List<Integer>> and using Optional.empty() as the
> broadcast
> > >>>>>>> value. IMO, The chances that an implementation returns null due
> to
> > a
> > >>>> bug
> > >>>>>>> is much higher than that an implementation returns an empty
> > Optional
> > >>>> due
> > >>>>>>> to a bug.
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Chris Egerton <ch...@aiven.io.INVALID>.
+1 to Bruno's concerns about backward compatibility. Do we actually need
support for dropping records in the partitioner? It doesn't seem necessary
based on the motivation for the KIP. If we remove that feature, we could
handle null and/or empty lists by using the default partitioning,
equivalent to how we handle null return values from the existing partition
method today.

On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <ca...@apache.org> wrote:

> Hi Sagar,
>
> Thank you for the updates!
>
> I do not intend to prolong this vote thread more than needed, but I
> still have some points.
>
> The deprecated partition method can return null if the default
> partitioning logic of the producer should be used.
> With the new method partitions() it seems that it is not possible to use
> the default partitioning logic, anymore.
>
> Also, in the default implementation of method partitions(), a record
> that would use the default partitioning logic in method partition()
> would be dropped, which would break backward compatibility since Streams
> would always call the new method partitions() even though the users
> still implement the deprecated method partition().
>
> I have a last point that we should probably discuss on the PR and not on
> the KIP but since you added the code in the KIP I need to mention it. I
> do not think you should check the validity of the partition number since
> the ProducerRecord does the same check and throws an exception. If
> Streams adds the same check but does not throw, the behavior is not
> backward compatible.
>
> Best,
> Bruno
>
>
> On 30.08.22 12:43, Sagar wrote:
> > Thanks Bruno/Chris,
> >
> > Even I agree that might be better to keep it simple like the way Chris
> > suggested. I have updated the KIP accordingly. I made couple of minor
> > changes to the KIP:
> >
> > 1) One of them being the change of return type of partitions method from
> > List to Set. This is to ensure that in case the implementation of
> > StreamPartitoner is buggy and ends up returning duplicate
> > partition numbers, we won't have duplicates thereby not trying to send to
> > the same partition multiple times due to this.
> > 2) I also added a check to send the record only to valid partition
> numbers
> > and log and drop when the partition number is invalid. This is again to
> > prevent errors for cases when the StreamPartitioner implementation has
> some
> > bugs (since there are no validations as such).
> > 3) I also updated the Test Plan section based on the suggestion from
> Bruno.
> > 4) I updated the default implementation of partitions method based on the
> > great catch from Chris!
> >
> > Let me know if it looks fine now.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org>
> wrote:
> >
> >> Hi,
> >>
> >> I am favour of discarding the sugar for broadcasting and leave the
> >> broadcasting to the implementation as Chris suggests. I think that is
> >> the cleanest option.
> >>
> >> Best,
> >> Bruno
> >>
> >> On 29.08.22 19:50, Chris Egerton wrote:
> >>> Hi all,
> >>>
> >>> I think it'd be useful to be more explicit about broadcasting to all
> >> topic
> >>> partitions rather than add implicit behavior for empty cases (empty
> >>> optional, empty list, etc.). The suggested enum approach would address
> >> that
> >>> nicely.
> >>>
> >>> It's also worth noting that there's no hard requirement to add sugar
> for
> >>> broadcasting to all topic partitions since the API already provides the
> >>> number of topic partitions available when calling a stream partitioner.
> >> If
> >>> we can't find a clean way to add this support, it might be better to
> >> leave
> >>> it out and just let people implement this themselves with a line of
> Java
> >> 8
> >>> streams:
> >>>
> >>>       return IntStream.range(0,
> >>> numPartitions).boxed().collect(Collectors.toList());
> >>>
> >>> Also worth noting that there may be a bug in the default implementation
> >> for
> >>> the new StreamPartitioner::partitions method, since it doesn't appear
> to
> >>> correctly pick up on null return values from the partition method and
> >>> translate them into empty lists.
> >>>
> >>> Cheers,
> >>>
> >>> Chris
> >>>
> >>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org>
> >> wrote:
> >>>
> >>>> Hi Sagar,
> >>>>
> >>>> I do not see an issue with using an empty list together with an empty
> >>>> Optional. A non-empty Optional that contains an empty list would just
> >>>> say that there is no partition to which the record should be sent. If
> >>>> there is no partition, the record is dropped.
> >>>>
> >>>> An empty Optional might be a way to say, broadcast to all partitions.
> >>>>
> >>>> Alternatively -- to make it more explicit -- you could return an
> object
> >>>> with an enum and a list of partitions. The enum could have values
> SOME,
> >>>> ALL, and NONE. If the value is SOME, the list of partitions contains
> the
> >>>> partitions to which to send the record. If the value of the enum is
> ALL
> >>>> or NONE, the list of partitions is not used and might be even null
> >>>> (since in that case the list should not be used and it would be a bug
> to
> >>>> use it).
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 24.08.22 20:11, Sagar wrote:
> >>>>> Thank you Bruno/Matthew for your comments.
> >>>>>
> >>>>> I agree using null does seem error prone. However I think using a
> >>>> singleton
> >>>>> list of [-1] might be better in terms of usability, I am saying this
> >>>>> because the KIP also has a provision to return an empty list to refer
> >> to
> >>>>> dropping the record. So, an empty optional and an empty list have
> >> totally
> >>>>> different meanings which could get confusing.
> >>>>>
> >>>>> Let me know what you think.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>>
> >>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> >>>>> <ma...@aiven.io.invalid> wrote:
> >>>>>
> >>>>>> I also concur with this, having an Optional in the type makes it
> very
> >>>>>> clear what’s going on and better signifies an absence of value (or
> in
> >>>> this
> >>>>>> case the broadcast value).
> >>>>>>
> >>>>>> --
> >>>>>> Matthew de Detrich
> >>>>>> Aiven Deutschland GmbH
> >>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>>>
> >>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>>>> m: +491603708037
> >>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
> >>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >>>>>>>
> >>>>>>> 2.
> >>>>>>> I would prefer changing the return type of partitions() to
> >>>>>>> Optional<List<Integer>> and using Optional.empty() as the broadcast
> >>>>>>> value. IMO, The chances that an implementation returns null due to
> a
> >>>> bug
> >>>>>>> is much higher than that an implementation returns an empty
> Optional
> >>>> due
> >>>>>>> to a bug.
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Bruno Cadonna <ca...@apache.org>.
Hi Sagar,

Thank you for the updates!

I do not intend to prolong this vote thread more than needed, but I 
still have some points.

The deprecated partition method can return null if the default 
partitioning logic of the producer should be used.
With the new method partitions() it seems that it is not possible to use 
the default partitioning logic, anymore.

Also, in the default implementation of method partitions(), a record 
that would use the default partitioning logic in method partition() 
would be dropped, which would break backward compatibility since Streams 
would always call the new method partitions() even though the users 
still implement the deprecated method partition().

I have a last point that we should probably discuss on the PR and not on 
the KIP but since you added the code in the KIP I need to mention it. I 
do not think you should check the validity of the partition number since 
the ProducerRecord does the same check and throws an exception. If 
Streams adds the same check but does not throw, the behavior is not 
backward compatible.

Best,
Bruno


On 30.08.22 12:43, Sagar wrote:
> Thanks Bruno/Chris,
> 
> Even I agree that might be better to keep it simple like the way Chris
> suggested. I have updated the KIP accordingly. I made couple of minor
> changes to the KIP:
> 
> 1) One of them being the change of return type of partitions method from
> List to Set. This is to ensure that in case the implementation of
> StreamPartitoner is buggy and ends up returning duplicate
> partition numbers, we won't have duplicates thereby not trying to send to
> the same partition multiple times due to this.
> 2) I also added a check to send the record only to valid partition numbers
> and log and drop when the partition number is invalid. This is again to
> prevent errors for cases when the StreamPartitioner implementation has some
> bugs (since there are no validations as such).
> 3) I also updated the Test Plan section based on the suggestion from Bruno.
> 4) I updated the default implementation of partitions method based on the
> great catch from Chris!
> 
> Let me know if it looks fine now.
> 
> Thanks!
> Sagar.
> 
> 
> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org> wrote:
> 
>> Hi,
>>
>> I am favour of discarding the sugar for broadcasting and leave the
>> broadcasting to the implementation as Chris suggests. I think that is
>> the cleanest option.
>>
>> Best,
>> Bruno
>>
>> On 29.08.22 19:50, Chris Egerton wrote:
>>> Hi all,
>>>
>>> I think it'd be useful to be more explicit about broadcasting to all
>> topic
>>> partitions rather than add implicit behavior for empty cases (empty
>>> optional, empty list, etc.). The suggested enum approach would address
>> that
>>> nicely.
>>>
>>> It's also worth noting that there's no hard requirement to add sugar for
>>> broadcasting to all topic partitions since the API already provides the
>>> number of topic partitions available when calling a stream partitioner.
>> If
>>> we can't find a clean way to add this support, it might be better to
>> leave
>>> it out and just let people implement this themselves with a line of Java
>> 8
>>> streams:
>>>
>>>       return IntStream.range(0,
>>> numPartitions).boxed().collect(Collectors.toList());
>>>
>>> Also worth noting that there may be a bug in the default implementation
>> for
>>> the new StreamPartitioner::partitions method, since it doesn't appear to
>>> correctly pick up on null return values from the partition method and
>>> translate them into empty lists.
>>>
>>> Cheers,
>>>
>>> Chris
>>>
>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org>
>> wrote:
>>>
>>>> Hi Sagar,
>>>>
>>>> I do not see an issue with using an empty list together with an empty
>>>> Optional. A non-empty Optional that contains an empty list would just
>>>> say that there is no partition to which the record should be sent. If
>>>> there is no partition, the record is dropped.
>>>>
>>>> An empty Optional might be a way to say, broadcast to all partitions.
>>>>
>>>> Alternatively -- to make it more explicit -- you could return an object
>>>> with an enum and a list of partitions. The enum could have values SOME,
>>>> ALL, and NONE. If the value is SOME, the list of partitions contains the
>>>> partitions to which to send the record. If the value of the enum is ALL
>>>> or NONE, the list of partitions is not used and might be even null
>>>> (since in that case the list should not be used and it would be a bug to
>>>> use it).
>>>>
>>>> Best,
>>>> Bruno
>>>>
>>>> On 24.08.22 20:11, Sagar wrote:
>>>>> Thank you Bruno/Matthew for your comments.
>>>>>
>>>>> I agree using null does seem error prone. However I think using a
>>>> singleton
>>>>> list of [-1] might be better in terms of usability, I am saying this
>>>>> because the KIP also has a provision to return an empty list to refer
>> to
>>>>> dropping the record. So, an empty optional and an empty list have
>> totally
>>>>> different meanings which could get confusing.
>>>>>
>>>>> Let me know what you think.
>>>>>
>>>>> Thanks!
>>>>> Sagar.
>>>>>
>>>>>
>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>>>>> <ma...@aiven.io.invalid> wrote:
>>>>>
>>>>>> I also concur with this, having an Optional in the type makes it very
>>>>>> clear what’s going on and better signifies an absence of value (or in
>>>> this
>>>>>> case the broadcast value).
>>>>>>
>>>>>> --
>>>>>> Matthew de Detrich
>>>>>> Aiven Deutschland GmbH
>>>>>> Immanuelkirchstraße 26, 10405 Berlin
>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
>>>>>>
>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>>>>> m: +491603708037
>>>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>>>>>>>
>>>>>>> 2.
>>>>>>> I would prefer changing the return type of partitions() to
>>>>>>> Optional<List<Integer>> and using Optional.empty() as the broadcast
>>>>>>> value. IMO, The chances that an implementation returns null due to a
>>>> bug
>>>>>>> is much higher than that an implementation returns an empty Optional
>>>> due
>>>>>>> to a bug.
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Thanks Bruno/Chris,

Even I agree that might be better to keep it simple like the way Chris
suggested. I have updated the KIP accordingly. I made couple of minor
changes to the KIP:

1) One of them being the change of return type of partitions method from
List to Set. This is to ensure that in case the implementation of
StreamPartitoner is buggy and ends up returning duplicate
partition numbers, we won't have duplicates thereby not trying to send to
the same partition multiple times due to this.
2) I also added a check to send the record only to valid partition numbers
and log and drop when the partition number is invalid. This is again to
prevent errors for cases when the StreamPartitioner implementation has some
bugs (since there are no validations as such).
3) I also updated the Test Plan section based on the suggestion from Bruno.
4) I updated the default implementation of partitions method based on the
great catch from Chris!

Let me know if it looks fine now.

Thanks!
Sagar.


On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <ca...@apache.org> wrote:

> Hi,
>
> I am favour of discarding the sugar for broadcasting and leave the
> broadcasting to the implementation as Chris suggests. I think that is
> the cleanest option.
>
> Best,
> Bruno
>
> On 29.08.22 19:50, Chris Egerton wrote:
> > Hi all,
> >
> > I think it'd be useful to be more explicit about broadcasting to all
> topic
> > partitions rather than add implicit behavior for empty cases (empty
> > optional, empty list, etc.). The suggested enum approach would address
> that
> > nicely.
> >
> > It's also worth noting that there's no hard requirement to add sugar for
> > broadcasting to all topic partitions since the API already provides the
> > number of topic partitions available when calling a stream partitioner.
> If
> > we can't find a clean way to add this support, it might be better to
> leave
> > it out and just let people implement this themselves with a line of Java
> 8
> > streams:
> >
> >      return IntStream.range(0,
> > numPartitions).boxed().collect(Collectors.toList());
> >
> > Also worth noting that there may be a bug in the default implementation
> for
> > the new StreamPartitioner::partitions method, since it doesn't appear to
> > correctly pick up on null return values from the partition method and
> > translate them into empty lists.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org>
> wrote:
> >
> >> Hi Sagar,
> >>
> >> I do not see an issue with using an empty list together with an empty
> >> Optional. A non-empty Optional that contains an empty list would just
> >> say that there is no partition to which the record should be sent. If
> >> there is no partition, the record is dropped.
> >>
> >> An empty Optional might be a way to say, broadcast to all partitions.
> >>
> >> Alternatively -- to make it more explicit -- you could return an object
> >> with an enum and a list of partitions. The enum could have values SOME,
> >> ALL, and NONE. If the value is SOME, the list of partitions contains the
> >> partitions to which to send the record. If the value of the enum is ALL
> >> or NONE, the list of partitions is not used and might be even null
> >> (since in that case the list should not be used and it would be a bug to
> >> use it).
> >>
> >> Best,
> >> Bruno
> >>
> >> On 24.08.22 20:11, Sagar wrote:
> >>> Thank you Bruno/Matthew for your comments.
> >>>
> >>> I agree using null does seem error prone. However I think using a
> >> singleton
> >>> list of [-1] might be better in terms of usability, I am saying this
> >>> because the KIP also has a provision to return an empty list to refer
> to
> >>> dropping the record. So, an empty optional and an empty list have
> totally
> >>> different meanings which could get confusing.
> >>>
> >>> Let me know what you think.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>>
> >>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> >>> <ma...@aiven.io.invalid> wrote:
> >>>
> >>>> I also concur with this, having an Optional in the type makes it very
> >>>> clear what’s going on and better signifies an absence of value (or in
> >> this
> >>>> case the broadcast value).
> >>>>
> >>>> --
> >>>> Matthew de Detrich
> >>>> Aiven Deutschland GmbH
> >>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>
> >>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>> m: +491603708037
> >>>> w: aiven.io e: matthew.dedetrich@aiven.io
> >>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >>>>>
> >>>>> 2.
> >>>>> I would prefer changing the return type of partitions() to
> >>>>> Optional<List<Integer>> and using Optional.empty() as the broadcast
> >>>>> value. IMO, The chances that an implementation returns null due to a
> >> bug
> >>>>> is much higher than that an implementation returns an empty Optional
> >> due
> >>>>> to a bug.
> >>>>
> >>>
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Bruno Cadonna <ca...@apache.org>.
Hi,

I am favour of discarding the sugar for broadcasting and leave the 
broadcasting to the implementation as Chris suggests. I think that is 
the cleanest option.

Best,
Bruno

On 29.08.22 19:50, Chris Egerton wrote:
> Hi all,
> 
> I think it'd be useful to be more explicit about broadcasting to all topic
> partitions rather than add implicit behavior for empty cases (empty
> optional, empty list, etc.). The suggested enum approach would address that
> nicely.
> 
> It's also worth noting that there's no hard requirement to add sugar for
> broadcasting to all topic partitions since the API already provides the
> number of topic partitions available when calling a stream partitioner. If
> we can't find a clean way to add this support, it might be better to leave
> it out and just let people implement this themselves with a line of Java 8
> streams:
> 
>      return IntStream.range(0,
> numPartitions).boxed().collect(Collectors.toList());
> 
> Also worth noting that there may be a bug in the default implementation for
> the new StreamPartitioner::partitions method, since it doesn't appear to
> correctly pick up on null return values from the partition method and
> translate them into empty lists.
> 
> Cheers,
> 
> Chris
> 
> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org> wrote:
> 
>> Hi Sagar,
>>
>> I do not see an issue with using an empty list together with an empty
>> Optional. A non-empty Optional that contains an empty list would just
>> say that there is no partition to which the record should be sent. If
>> there is no partition, the record is dropped.
>>
>> An empty Optional might be a way to say, broadcast to all partitions.
>>
>> Alternatively -- to make it more explicit -- you could return an object
>> with an enum and a list of partitions. The enum could have values SOME,
>> ALL, and NONE. If the value is SOME, the list of partitions contains the
>> partitions to which to send the record. If the value of the enum is ALL
>> or NONE, the list of partitions is not used and might be even null
>> (since in that case the list should not be used and it would be a bug to
>> use it).
>>
>> Best,
>> Bruno
>>
>> On 24.08.22 20:11, Sagar wrote:
>>> Thank you Bruno/Matthew for your comments.
>>>
>>> I agree using null does seem error prone. However I think using a
>> singleton
>>> list of [-1] might be better in terms of usability, I am saying this
>>> because the KIP also has a provision to return an empty list to refer to
>>> dropping the record. So, an empty optional and an empty list have totally
>>> different meanings which could get confusing.
>>>
>>> Let me know what you think.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>>
>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
>>> <ma...@aiven.io.invalid> wrote:
>>>
>>>> I also concur with this, having an Optional in the type makes it very
>>>> clear what’s going on and better signifies an absence of value (or in
>> this
>>>> case the broadcast value).
>>>>
>>>> --
>>>> Matthew de Detrich
>>>> Aiven Deutschland GmbH
>>>> Immanuelkirchstraße 26, 10405 Berlin
>>>> Amtsgericht Charlottenburg, HRB 209739 B
>>>>
>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>>>> m: +491603708037
>>>> w: aiven.io e: matthew.dedetrich@aiven.io
>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>>>>>
>>>>> 2.
>>>>> I would prefer changing the return type of partitions() to
>>>>> Optional<List<Integer>> and using Optional.empty() as the broadcast
>>>>> value. IMO, The chances that an implementation returns null due to a
>> bug
>>>>> is much higher than that an implementation returns an empty Optional
>> due
>>>>> to a bug.
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Chris Egerton <ch...@aiven.io.INVALID>.
Hi all,

I think it'd be useful to be more explicit about broadcasting to all topic
partitions rather than add implicit behavior for empty cases (empty
optional, empty list, etc.). The suggested enum approach would address that
nicely.

It's also worth noting that there's no hard requirement to add sugar for
broadcasting to all topic partitions since the API already provides the
number of topic partitions available when calling a stream partitioner. If
we can't find a clean way to add this support, it might be better to leave
it out and just let people implement this themselves with a line of Java 8
streams:

    return IntStream.range(0,
numPartitions).boxed().collect(Collectors.toList());

Also worth noting that there may be a bug in the default implementation for
the new StreamPartitioner::partitions method, since it doesn't appear to
correctly pick up on null return values from the partition method and
translate them into empty lists.

Cheers,

Chris

On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <ca...@apache.org> wrote:

> Hi Sagar,
>
> I do not see an issue with using an empty list together with an empty
> Optional. A non-empty Optional that contains an empty list would just
> say that there is no partition to which the record should be sent. If
> there is no partition, the record is dropped.
>
> An empty Optional might be a way to say, broadcast to all partitions.
>
> Alternatively -- to make it more explicit -- you could return an object
> with an enum and a list of partitions. The enum could have values SOME,
> ALL, and NONE. If the value is SOME, the list of partitions contains the
> partitions to which to send the record. If the value of the enum is ALL
> or NONE, the list of partitions is not used and might be even null
> (since in that case the list should not be used and it would be a bug to
> use it).
>
> Best,
> Bruno
>
> On 24.08.22 20:11, Sagar wrote:
> > Thank you Bruno/Matthew for your comments.
> >
> > I agree using null does seem error prone. However I think using a
> singleton
> > list of [-1] might be better in terms of usability, I am saying this
> > because the KIP also has a provision to return an empty list to refer to
> > dropping the record. So, an empty optional and an empty list have totally
> > different meanings which could get confusing.
> >
> > Let me know what you think.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> > <ma...@aiven.io.invalid> wrote:
> >
> >> I also concur with this, having an Optional in the type makes it very
> >> clear what’s going on and better signifies an absence of value (or in
> this
> >> case the broadcast value).
> >>
> >> --
> >> Matthew de Detrich
> >> Aiven Deutschland GmbH
> >> Immanuelkirchstraße 26, 10405 Berlin
> >> Amtsgericht Charlottenburg, HRB 209739 B
> >>
> >> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >> m: +491603708037
> >> w: aiven.io e: matthew.dedetrich@aiven.io
> >> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >>>
> >>> 2.
> >>> I would prefer changing the return type of partitions() to
> >>> Optional<List<Integer>> and using Optional.empty() as the broadcast
> >>> value. IMO, The chances that an implementation returns null due to a
> bug
> >>> is much higher than that an implementation returns an empty Optional
> due
> >>> to a bug.
> >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Bruno Cadonna <ca...@apache.org>.
Hi Sagar,

I do not see an issue with using an empty list together with an empty 
Optional. A non-empty Optional that contains an empty list would just 
say that there is no partition to which the record should be sent. If 
there is no partition, the record is dropped.

An empty Optional might be a way to say, broadcast to all partitions.

Alternatively -- to make it more explicit -- you could return an object 
with an enum and a list of partitions. The enum could have values SOME, 
ALL, and NONE. If the value is SOME, the list of partitions contains the 
partitions to which to send the record. If the value of the enum is ALL 
or NONE, the list of partitions is not used and might be even null 
(since in that case the list should not be used and it would be a bug to 
use it).

Best,
Bruno

On 24.08.22 20:11, Sagar wrote:
> Thank you Bruno/Matthew for your comments.
> 
> I agree using null does seem error prone. However I think using a singleton
> list of [-1] might be better in terms of usability, I am saying this
> because the KIP also has a provision to return an empty list to refer to
> dropping the record. So, an empty optional and an empty list have totally
> different meanings which could get confusing.
> 
> Let me know what you think.
> 
> Thanks!
> Sagar.
> 
> 
> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> <ma...@aiven.io.invalid> wrote:
> 
>> I also concur with this, having an Optional in the type makes it very
>> clear what’s going on and better signifies an absence of value (or in this
>> case the broadcast value).
>>
>> --
>> Matthew de Detrich
>> Aiven Deutschland GmbH
>> Immanuelkirchstraße 26, 10405 Berlin
>> Amtsgericht Charlottenburg, HRB 209739 B
>>
>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> m: +491603708037
>> w: aiven.io e: matthew.dedetrich@aiven.io
>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>>>
>>> 2.
>>> I would prefer changing the return type of partitions() to
>>> Optional<List<Integer>> and using Optional.empty() as the broadcast
>>> value. IMO, The chances that an implementation returns null due to a bug
>>> is much higher than that an implementation returns an empty Optional due
>>> to a bug.
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Thank you Bruno/Matthew for your comments.

I agree using null does seem error prone. However I think using a singleton
list of [-1] might be better in terms of usability, I am saying this
because the KIP also has a provision to return an empty list to refer to
dropping the record. So, an empty optional and an empty list have totally
different meanings which could get confusing.

Let me know what you think.

Thanks!
Sagar.


On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
<ma...@aiven.io.invalid> wrote:

> I also concur with this, having an Optional in the type makes it very
> clear what’s going on and better signifies an absence of value (or in this
> case the broadcast value).
>
> --
> Matthew de Detrich
> Aiven Deutschland GmbH
> Immanuelkirchstraße 26, 10405 Berlin
> Amtsgericht Charlottenburg, HRB 209739 B
>
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> m: +491603708037
> w: aiven.io e: matthew.dedetrich@aiven.io
> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
> >
> > 2.
> > I would prefer changing the return type of partitions() to
> > Optional<List<Integer>> and using Optional.empty() as the broadcast
> > value. IMO, The chances that an implementation returns null due to a bug
> > is much higher than that an implementation returns an empty Optional due
> > to a bug.
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Matthew Benedict de Detrich <ma...@aiven.io.INVALID>.
I also concur with this, having an Optional in the type makes it very clear what’s going on and better signifies an absence of value (or in this case the broadcast value).

--
Matthew de Detrich
Aiven Deutschland GmbH
Immanuelkirchstraße 26, 10405 Berlin
Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
m: +491603708037
w: aiven.io e: matthew.dedetrich@aiven.io
On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote:
>
> 2.
> I would prefer changing the return type of partitions() to
> Optional<List<Integer>> and using Optional.empty() as the broadcast
> value. IMO, The chances that an implementation returns null due to a bug
> is much higher than that an implementation returns an empty Optional due
> to a bug.

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Bruno Cadonna <ca...@apache.org>.
Hi Sagar,

Thank you for the KIP and sorry for being late to the party!

1.
The java docs for partitions() say:

"Note that returning a single valued list with value -1 is a shorthand 
for broadcasting the record to all the partitions of the topic."

I guess that is not true anymore since the code in the "Proposed 
Changes" section checks for null to decide about broadcasting.

2.
I would prefer changing the return type of partitions() to 
Optional<List<Integer>> and using Optional.empty() as the broadcast 
value. IMO, The chances that an implementation returns null due to a bug 
is much higher than that an implementation returns an empty Optional due 
to a bug. I would also be fine with a singleton list with a -1 as you 
describe in the java docs.

3.
Recently a "Test Plan" section was added to the KIP template. Your KIP 
is missing this section.


Best,
Bruno

On 24.08.22 00:22, Sophie Blee-Goldman wrote:
> Thanks for the updates, it reads much more clearly to me now. Looks great
> 
> +1 (binding)
> 
> Cheers,
> Sophie
> 
> On Fri, Aug 19, 2022 at 1:42 AM Sagar <sa...@gmail.com> wrote:
> 
>> Thanks Sophie for the review. I see the confusion. As you pointed out, the
>> problem the KIP is trying to solve is not the avoidance of a custom
>> partitioner. Instead the process of sending or replicating the message N
>> times and then having the record wired through via a new custom partitioner
>> for every replication. That's what I tried to convey in the motivation
>> section. I updated the motivation slightly, let me know if that sounds ok.
>>
>> Also, yes the dropping of records using a custom partitioner is an added
>> benefit that we get. I think the custom partitioner bit is important as one
>> can always filter the records out initially.
>>
>> Let me know if this looks ok?
>>
>> Thanks!
>> Sagar.
>>
>> On Fri, Aug 19, 2022 at 10:17 AM Sophie Blee-Goldman
>> <so...@confluent.io.invalid> wrote:
>>
>>> Thanks Sagar -- one thing I'm still confused about, and sorry to keep
>>> pushing on this, but the example
>>> you gave for how this works in today's world seems not to correspond to
>> the
>>> method described in the
>>> text of the Motivation exception, ie
>>>
>>> Currently, if a user wants to replicate a message into N partitions, the
>>>> only way of doing that is to replicate the message N times and then
>>> plug-in
>>>> a custom partitioner to write the message N times into N different
>>>> partitions.
>>>
>>>
>>>
>>>   This seems a little cumbersome way to broadcast. Also, there seems to be
>>>> no way of dropping a record within the partitioner. This KIP aims to
>> make
>>>> this process simpler in Kafka Streams.
>>>
>>>
>>> It sounds like you're saying the problem this KIP is fixing is that the
>>> only way to do this is by implementing
>>> a custom partitioner and that this is cumbersome, but that's actually
>>> exactly what this KIP is doing: providing
>>> a method od multi-casting via implementing a custom partitioner (as seen
>> in
>>> the example usage you provided).
>>> Thanks to your examples I think I now understand better what the KIP is
>>> doing, and assume what's written in
>>> the motivation section is just a type/mistake -- can you confirm?
>>>
>>> That said, the claim about having "no way of dropping a record within the
>>> partitioner" does actually seem to be
>>> correct, that is you couldn't do it with a custom partitioner prior to
>> this
>>> KIP and now you can. I would consider
>>> that a secondary/additional improvement that these changes provide, but
>>> it's not strictly speaking related to
>>>   multi-casting, right? (Just checking my understanding, not challenging
>>> anything about this)
>>>
>>> Cheers,
>>> Sophie
>>>
>>> On Thu, Aug 18, 2022 at 7:27 AM Sagar <sa...@gmail.com> wrote:
>>>
>>>> Hello Sophie,
>>>>
>>>> Thanks for your feedback. I have made all the suggested changes.
>>>>
>>>> One note, on how users can accomplish this in today's world , I have
>> made
>>>> up this example and have never tried myself before. But I am assuming
>> it
>>>> will work.
>>>>
>>>> Let me know what you think.
>>>>
>>>> Thanks!
>>>> Sagar.
>>>>
>>>>
>>>> On Thu, Aug 18, 2022 at 7:17 AM Sophie Blee-Goldman
>>>> <so...@confluent.io.invalid> wrote:
>>>>
>>>>> Hey Sagar, thanks for the KIP!
>>>>>
>>>>> Just some cosmetic points to make it absolutely clear what this KIP
>> is
>>>>> doing:
>>>>> 1) could you clarify up front in the Motivation section that this is
>>>>> focused on Kafka Streams applications, and not the plain Producer
>>> client?
>>>>> 2) you included the entire implementation of the `#send` method to
>>>>> demonstrate the change in logic, but can you either remove the parts
>> of
>>>>> the implementation that aren't being touched here or at least
>> highlight
>>>> in
>>>>> some way the specific lines that have changed?
>>>>> 3) In general the implementation is, well, an implementation detail
>>> that
>>>>> doesn't need to be included in the KIP, but it's ok -- always nice to
>>>> get a
>>>>> sense of how things will work internally. But what I think would be
>>> more
>>>>> useful to show in the KIP is how things will work with the new public
>>>>> interface -- ie, can you provide a brief example of how a user would
>> go
>>>>> about taking advantage of this new interface? Even better, include an
>>>>> example of what it takes for a user to accomplish this behavior
>> before
>>>> this
>>>>> KIP. It would help showcase the concrete benefit this KIP is bringing
>>> and
>>>>> anchor the motivation section a bit better.
>>>>>
>>>>> Also nit: in the 2nd sentence under Public Interfaces I think it
>> should
>>>> say
>>>>> "it would invoke the partition()  method" -- ie this should be
>>>>> "partition()" not "partition*s*()"
>>>>>
>>>>> Other than that this looks good, but I'll wait until you've addressed
>>> the
>>>>> above to cast a vote.
>>>>>
>>>>> Thanks!
>>>>> Sophie
>>>>>
>>>>> On Fri, Aug 12, 2022 at 10:36 PM Sagar <sa...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hey John,
>>>>>>
>>>>>> Thanks for the vote. I added the reason for the rejection of the
>>>>>> alternatives. The first one is basically an option to broadcast to
>>> all
>>>>>> partitions which I felt was restrictive. Instead the KIP allows
>>>>>> multicasting to 0-N partitions based upon the partitioner
>>>> implementation.
>>>>>>
>>>>>> Thanks!
>>>>>> Sagar.
>>>>>>
>>>>>> On Sat, Aug 13, 2022 at 7:35 AM John Roesler <vv...@apache.org>
>>>>> wrote:
>>>>>>
>>>>>>> Thanks, Sagar!
>>>>>>>
>>>>>>> I’m +1 (binding)
>>>>>>>
>>>>>>> Can you add a short explanation to each rejected alternative? I
>> was
>>>>>>> wondering why we wouldn’t provide an overloaded to()/addSink()
>> (the
>>>>> first
>>>>>>> rejected alternative), and I had to look back at the Streams code
>>> to
>>>>> see
>>>>>>> that they both already accept the partitioner (I thought it was a
>>>>>> config).
>>>>>>>
>>>>>>> Thanks!
>>>>>>> -John
>>>>>>>
>>>>>>> On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
>>>>>>>> +1 (non binding)
>>>>>>>>
>>>>>>>> Walker
>>>>>>>>
>>>>>>>> On Tue, May 31, 2022 at 4:44 AM Sagar <
>> sagarmeansocean@gmail.com
>>>>
>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I would like to start a voting thread on
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
>>>>>>>>> .
>>>>>>>>>
>>>>>>>>> I am just starting this as the discussion thread has been open
>>> for
>>>>> 10+
>>>>>>>>> days. In case there are some comments, we can always discuss
>>> them
>>>>> over
>>>>>>>>> there.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Sagar.
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Thanks for the updates, it reads much more clearly to me now. Looks great

+1 (binding)

Cheers,
Sophie

On Fri, Aug 19, 2022 at 1:42 AM Sagar <sa...@gmail.com> wrote:

> Thanks Sophie for the review. I see the confusion. As you pointed out, the
> problem the KIP is trying to solve is not the avoidance of a custom
> partitioner. Instead the process of sending or replicating the message N
> times and then having the record wired through via a new custom partitioner
> for every replication. That's what I tried to convey in the motivation
> section. I updated the motivation slightly, let me know if that sounds ok.
>
> Also, yes the dropping of records using a custom partitioner is an added
> benefit that we get. I think the custom partitioner bit is important as one
> can always filter the records out initially.
>
> Let me know if this looks ok?
>
> Thanks!
> Sagar.
>
> On Fri, Aug 19, 2022 at 10:17 AM Sophie Blee-Goldman
> <so...@confluent.io.invalid> wrote:
>
> > Thanks Sagar -- one thing I'm still confused about, and sorry to keep
> > pushing on this, but the example
> > you gave for how this works in today's world seems not to correspond to
> the
> > method described in the
> > text of the Motivation exception, ie
> >
> > Currently, if a user wants to replicate a message into N partitions, the
> > > only way of doing that is to replicate the message N times and then
> > plug-in
> > > a custom partitioner to write the message N times into N different
> > > partitions.
> >
> >
> >
> >  This seems a little cumbersome way to broadcast. Also, there seems to be
> > > no way of dropping a record within the partitioner. This KIP aims to
> make
> > > this process simpler in Kafka Streams.
> >
> >
> > It sounds like you're saying the problem this KIP is fixing is that the
> > only way to do this is by implementing
> > a custom partitioner and that this is cumbersome, but that's actually
> > exactly what this KIP is doing: providing
> > a method od multi-casting via implementing a custom partitioner (as seen
> in
> > the example usage you provided).
> > Thanks to your examples I think I now understand better what the KIP is
> > doing, and assume what's written in
> > the motivation section is just a type/mistake -- can you confirm?
> >
> > That said, the claim about having "no way of dropping a record within the
> > partitioner" does actually seem to be
> > correct, that is you couldn't do it with a custom partitioner prior to
> this
> > KIP and now you can. I would consider
> > that a secondary/additional improvement that these changes provide, but
> > it's not strictly speaking related to
> >  multi-casting, right? (Just checking my understanding, not challenging
> > anything about this)
> >
> > Cheers,
> > Sophie
> >
> > On Thu, Aug 18, 2022 at 7:27 AM Sagar <sa...@gmail.com> wrote:
> >
> > > Hello Sophie,
> > >
> > > Thanks for your feedback. I have made all the suggested changes.
> > >
> > > One note, on how users can accomplish this in today's world , I have
> made
> > > up this example and have never tried myself before. But I am assuming
> it
> > > will work.
> > >
> > > Let me know what you think.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Thu, Aug 18, 2022 at 7:17 AM Sophie Blee-Goldman
> > > <so...@confluent.io.invalid> wrote:
> > >
> > > > Hey Sagar, thanks for the KIP!
> > > >
> > > > Just some cosmetic points to make it absolutely clear what this KIP
> is
> > > > doing:
> > > > 1) could you clarify up front in the Motivation section that this is
> > > > focused on Kafka Streams applications, and not the plain Producer
> > client?
> > > > 2) you included the entire implementation of the `#send` method to
> > > > demonstrate the change in logic, but can you either remove the parts
> of
> > > > the implementation that aren't being touched here or at least
> highlight
> > > in
> > > > some way the specific lines that have changed?
> > > > 3) In general the implementation is, well, an implementation detail
> > that
> > > > doesn't need to be included in the KIP, but it's ok -- always nice to
> > > get a
> > > > sense of how things will work internally. But what I think would be
> > more
> > > > useful to show in the KIP is how things will work with the new public
> > > > interface -- ie, can you provide a brief example of how a user would
> go
> > > > about taking advantage of this new interface? Even better, include an
> > > > example of what it takes for a user to accomplish this behavior
> before
> > > this
> > > > KIP. It would help showcase the concrete benefit this KIP is bringing
> > and
> > > > anchor the motivation section a bit better.
> > > >
> > > > Also nit: in the 2nd sentence under Public Interfaces I think it
> should
> > > say
> > > > "it would invoke the partition()  method" -- ie this should be
> > > > "partition()" not "partition*s*()"
> > > >
> > > > Other than that this looks good, but I'll wait until you've addressed
> > the
> > > > above to cast a vote.
> > > >
> > > > Thanks!
> > > > Sophie
> > > >
> > > > On Fri, Aug 12, 2022 at 10:36 PM Sagar <sa...@gmail.com>
> > > wrote:
> > > >
> > > > > Hey John,
> > > > >
> > > > > Thanks for the vote. I added the reason for the rejection of the
> > > > > alternatives. The first one is basically an option to broadcast to
> > all
> > > > > partitions which I felt was restrictive. Instead the KIP allows
> > > > > multicasting to 0-N partitions based upon the partitioner
> > > implementation.
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > > On Sat, Aug 13, 2022 at 7:35 AM John Roesler <vv...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Thanks, Sagar!
> > > > > >
> > > > > > I’m +1 (binding)
> > > > > >
> > > > > > Can you add a short explanation to each rejected alternative? I
> was
> > > > > > wondering why we wouldn’t provide an overloaded to()/addSink()
> (the
> > > > first
> > > > > > rejected alternative), and I had to look back at the Streams code
> > to
> > > > see
> > > > > > that they both already accept the partitioner (I thought it was a
> > > > > config).
> > > > > >
> > > > > > Thanks!
> > > > > > -John
> > > > > >
> > > > > > On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> > > > > > > +1 (non binding)
> > > > > > >
> > > > > > > Walker
> > > > > > >
> > > > > > > On Tue, May 31, 2022 at 4:44 AM Sagar <
> sagarmeansocean@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Hi All,
> > > > > > >>
> > > > > > >> I would like to start a voting thread on
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> > > > > > >> .
> > > > > > >>
> > > > > > >> I am just starting this as the discussion thread has been open
> > for
> > > > 10+
> > > > > > >> days. In case there are some comments, we can always discuss
> > them
> > > > over
> > > > > > >> there.
> > > > > > >>
> > > > > > >> Thanks!
> > > > > > >> Sagar.
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Thanks Sophie for the review. I see the confusion. As you pointed out, the
problem the KIP is trying to solve is not the avoidance of a custom
partitioner. Instead the process of sending or replicating the message N
times and then having the record wired through via a new custom partitioner
for every replication. That's what I tried to convey in the motivation
section. I updated the motivation slightly, let me know if that sounds ok.

Also, yes the dropping of records using a custom partitioner is an added
benefit that we get. I think the custom partitioner bit is important as one
can always filter the records out initially.

Let me know if this looks ok?

Thanks!
Sagar.

On Fri, Aug 19, 2022 at 10:17 AM Sophie Blee-Goldman
<so...@confluent.io.invalid> wrote:

> Thanks Sagar -- one thing I'm still confused about, and sorry to keep
> pushing on this, but the example
> you gave for how this works in today's world seems not to correspond to the
> method described in the
> text of the Motivation exception, ie
>
> Currently, if a user wants to replicate a message into N partitions, the
> > only way of doing that is to replicate the message N times and then
> plug-in
> > a custom partitioner to write the message N times into N different
> > partitions.
>
>
>
>  This seems a little cumbersome way to broadcast. Also, there seems to be
> > no way of dropping a record within the partitioner. This KIP aims to make
> > this process simpler in Kafka Streams.
>
>
> It sounds like you're saying the problem this KIP is fixing is that the
> only way to do this is by implementing
> a custom partitioner and that this is cumbersome, but that's actually
> exactly what this KIP is doing: providing
> a method od multi-casting via implementing a custom partitioner (as seen in
> the example usage you provided).
> Thanks to your examples I think I now understand better what the KIP is
> doing, and assume what's written in
> the motivation section is just a type/mistake -- can you confirm?
>
> That said, the claim about having "no way of dropping a record within the
> partitioner" does actually seem to be
> correct, that is you couldn't do it with a custom partitioner prior to this
> KIP and now you can. I would consider
> that a secondary/additional improvement that these changes provide, but
> it's not strictly speaking related to
>  multi-casting, right? (Just checking my understanding, not challenging
> anything about this)
>
> Cheers,
> Sophie
>
> On Thu, Aug 18, 2022 at 7:27 AM Sagar <sa...@gmail.com> wrote:
>
> > Hello Sophie,
> >
> > Thanks for your feedback. I have made all the suggested changes.
> >
> > One note, on how users can accomplish this in today's world , I have made
> > up this example and have never tried myself before. But I am assuming it
> > will work.
> >
> > Let me know what you think.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Thu, Aug 18, 2022 at 7:17 AM Sophie Blee-Goldman
> > <so...@confluent.io.invalid> wrote:
> >
> > > Hey Sagar, thanks for the KIP!
> > >
> > > Just some cosmetic points to make it absolutely clear what this KIP is
> > > doing:
> > > 1) could you clarify up front in the Motivation section that this is
> > > focused on Kafka Streams applications, and not the plain Producer
> client?
> > > 2) you included the entire implementation of the `#send` method to
> > > demonstrate the change in logic, but can you either remove the parts of
> > > the implementation that aren't being touched here or at least highlight
> > in
> > > some way the specific lines that have changed?
> > > 3) In general the implementation is, well, an implementation detail
> that
> > > doesn't need to be included in the KIP, but it's ok -- always nice to
> > get a
> > > sense of how things will work internally. But what I think would be
> more
> > > useful to show in the KIP is how things will work with the new public
> > > interface -- ie, can you provide a brief example of how a user would go
> > > about taking advantage of this new interface? Even better, include an
> > > example of what it takes for a user to accomplish this behavior before
> > this
> > > KIP. It would help showcase the concrete benefit this KIP is bringing
> and
> > > anchor the motivation section a bit better.
> > >
> > > Also nit: in the 2nd sentence under Public Interfaces I think it should
> > say
> > > "it would invoke the partition()  method" -- ie this should be
> > > "partition()" not "partition*s*()"
> > >
> > > Other than that this looks good, but I'll wait until you've addressed
> the
> > > above to cast a vote.
> > >
> > > Thanks!
> > > Sophie
> > >
> > > On Fri, Aug 12, 2022 at 10:36 PM Sagar <sa...@gmail.com>
> > wrote:
> > >
> > > > Hey John,
> > > >
> > > > Thanks for the vote. I added the reason for the rejection of the
> > > > alternatives. The first one is basically an option to broadcast to
> all
> > > > partitions which I felt was restrictive. Instead the KIP allows
> > > > multicasting to 0-N partitions based upon the partitioner
> > implementation.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Sat, Aug 13, 2022 at 7:35 AM John Roesler <vv...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks, Sagar!
> > > > >
> > > > > I’m +1 (binding)
> > > > >
> > > > > Can you add a short explanation to each rejected alternative? I was
> > > > > wondering why we wouldn’t provide an overloaded to()/addSink() (the
> > > first
> > > > > rejected alternative), and I had to look back at the Streams code
> to
> > > see
> > > > > that they both already accept the partitioner (I thought it was a
> > > > config).
> > > > >
> > > > > Thanks!
> > > > > -John
> > > > >
> > > > > On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> > > > > > +1 (non binding)
> > > > > >
> > > > > > Walker
> > > > > >
> > > > > > On Tue, May 31, 2022 at 4:44 AM Sagar <sagarmeansocean@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > >> Hi All,
> > > > > >>
> > > > > >> I would like to start a voting thread on
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> > > > > >> .
> > > > > >>
> > > > > >> I am just starting this as the discussion thread has been open
> for
> > > 10+
> > > > > >> days. In case there are some comments, we can always discuss
> them
> > > over
> > > > > >> there.
> > > > > >>
> > > > > >> Thanks!
> > > > > >> Sagar.
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Thanks Sagar -- one thing I'm still confused about, and sorry to keep
pushing on this, but the example
you gave for how this works in today's world seems not to correspond to the
method described in the
text of the Motivation exception, ie

Currently, if a user wants to replicate a message into N partitions, the
> only way of doing that is to replicate the message N times and then plug-in
> a custom partitioner to write the message N times into N different
> partitions.



 This seems a little cumbersome way to broadcast. Also, there seems to be
> no way of dropping a record within the partitioner. This KIP aims to make
> this process simpler in Kafka Streams.


It sounds like you're saying the problem this KIP is fixing is that the
only way to do this is by implementing
a custom partitioner and that this is cumbersome, but that's actually
exactly what this KIP is doing: providing
a method od multi-casting via implementing a custom partitioner (as seen in
the example usage you provided).
Thanks to your examples I think I now understand better what the KIP is
doing, and assume what's written in
the motivation section is just a type/mistake -- can you confirm?

That said, the claim about having "no way of dropping a record within the
partitioner" does actually seem to be
correct, that is you couldn't do it with a custom partitioner prior to this
KIP and now you can. I would consider
that a secondary/additional improvement that these changes provide, but
it's not strictly speaking related to
 multi-casting, right? (Just checking my understanding, not challenging
anything about this)

Cheers,
Sophie

On Thu, Aug 18, 2022 at 7:27 AM Sagar <sa...@gmail.com> wrote:

> Hello Sophie,
>
> Thanks for your feedback. I have made all the suggested changes.
>
> One note, on how users can accomplish this in today's world , I have made
> up this example and have never tried myself before. But I am assuming it
> will work.
>
> Let me know what you think.
>
> Thanks!
> Sagar.
>
>
> On Thu, Aug 18, 2022 at 7:17 AM Sophie Blee-Goldman
> <so...@confluent.io.invalid> wrote:
>
> > Hey Sagar, thanks for the KIP!
> >
> > Just some cosmetic points to make it absolutely clear what this KIP is
> > doing:
> > 1) could you clarify up front in the Motivation section that this is
> > focused on Kafka Streams applications, and not the plain Producer client?
> > 2) you included the entire implementation of the `#send` method to
> > demonstrate the change in logic, but can you either remove the parts of
> > the implementation that aren't being touched here or at least highlight
> in
> > some way the specific lines that have changed?
> > 3) In general the implementation is, well, an implementation detail that
> > doesn't need to be included in the KIP, but it's ok -- always nice to
> get a
> > sense of how things will work internally. But what I think would be more
> > useful to show in the KIP is how things will work with the new public
> > interface -- ie, can you provide a brief example of how a user would go
> > about taking advantage of this new interface? Even better, include an
> > example of what it takes for a user to accomplish this behavior before
> this
> > KIP. It would help showcase the concrete benefit this KIP is bringing and
> > anchor the motivation section a bit better.
> >
> > Also nit: in the 2nd sentence under Public Interfaces I think it should
> say
> > "it would invoke the partition()  method" -- ie this should be
> > "partition()" not "partition*s*()"
> >
> > Other than that this looks good, but I'll wait until you've addressed the
> > above to cast a vote.
> >
> > Thanks!
> > Sophie
> >
> > On Fri, Aug 12, 2022 at 10:36 PM Sagar <sa...@gmail.com>
> wrote:
> >
> > > Hey John,
> > >
> > > Thanks for the vote. I added the reason for the rejection of the
> > > alternatives. The first one is basically an option to broadcast to all
> > > partitions which I felt was restrictive. Instead the KIP allows
> > > multicasting to 0-N partitions based upon the partitioner
> implementation.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Sat, Aug 13, 2022 at 7:35 AM John Roesler <vv...@apache.org>
> > wrote:
> > >
> > > > Thanks, Sagar!
> > > >
> > > > I’m +1 (binding)
> > > >
> > > > Can you add a short explanation to each rejected alternative? I was
> > > > wondering why we wouldn’t provide an overloaded to()/addSink() (the
> > first
> > > > rejected alternative), and I had to look back at the Streams code to
> > see
> > > > that they both already accept the partitioner (I thought it was a
> > > config).
> > > >
> > > > Thanks!
> > > > -John
> > > >
> > > > On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> > > > > +1 (non binding)
> > > > >
> > > > > Walker
> > > > >
> > > > > On Tue, May 31, 2022 at 4:44 AM Sagar <sa...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi All,
> > > > >>
> > > > >> I would like to start a voting thread on
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> > > > >> .
> > > > >>
> > > > >> I am just starting this as the discussion thread has been open for
> > 10+
> > > > >> days. In case there are some comments, we can always discuss them
> > over
> > > > >> there.
> > > > >>
> > > > >> Thanks!
> > > > >> Sagar.
> > > > >>
> > > >
> > >
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Hello Sophie,

Thanks for your feedback. I have made all the suggested changes.

One note, on how users can accomplish this in today's world , I have made
up this example and have never tried myself before. But I am assuming it
will work.

Let me know what you think.

Thanks!
Sagar.


On Thu, Aug 18, 2022 at 7:17 AM Sophie Blee-Goldman
<so...@confluent.io.invalid> wrote:

> Hey Sagar, thanks for the KIP!
>
> Just some cosmetic points to make it absolutely clear what this KIP is
> doing:
> 1) could you clarify up front in the Motivation section that this is
> focused on Kafka Streams applications, and not the plain Producer client?
> 2) you included the entire implementation of the `#send` method to
> demonstrate the change in logic, but can you either remove the parts of
> the implementation that aren't being touched here or at least highlight in
> some way the specific lines that have changed?
> 3) In general the implementation is, well, an implementation detail that
> doesn't need to be included in the KIP, but it's ok -- always nice to get a
> sense of how things will work internally. But what I think would be more
> useful to show in the KIP is how things will work with the new public
> interface -- ie, can you provide a brief example of how a user would go
> about taking advantage of this new interface? Even better, include an
> example of what it takes for a user to accomplish this behavior before this
> KIP. It would help showcase the concrete benefit this KIP is bringing and
> anchor the motivation section a bit better.
>
> Also nit: in the 2nd sentence under Public Interfaces I think it should say
> "it would invoke the partition()  method" -- ie this should be
> "partition()" not "partition*s*()"
>
> Other than that this looks good, but I'll wait until you've addressed the
> above to cast a vote.
>
> Thanks!
> Sophie
>
> On Fri, Aug 12, 2022 at 10:36 PM Sagar <sa...@gmail.com> wrote:
>
> > Hey John,
> >
> > Thanks for the vote. I added the reason for the rejection of the
> > alternatives. The first one is basically an option to broadcast to all
> > partitions which I felt was restrictive. Instead the KIP allows
> > multicasting to 0-N partitions based upon the partitioner implementation.
> >
> > Thanks!
> > Sagar.
> >
> > On Sat, Aug 13, 2022 at 7:35 AM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Thanks, Sagar!
> > >
> > > I’m +1 (binding)
> > >
> > > Can you add a short explanation to each rejected alternative? I was
> > > wondering why we wouldn’t provide an overloaded to()/addSink() (the
> first
> > > rejected alternative), and I had to look back at the Streams code to
> see
> > > that they both already accept the partitioner (I thought it was a
> > config).
> > >
> > > Thanks!
> > > -John
> > >
> > > On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> > > > +1 (non binding)
> > > >
> > > > Walker
> > > >
> > > > On Tue, May 31, 2022 at 4:44 AM Sagar <sa...@gmail.com>
> > wrote:
> > > >
> > > >> Hi All,
> > > >>
> > > >> I would like to start a voting thread on
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> > > >> .
> > > >>
> > > >> I am just starting this as the discussion thread has been open for
> 10+
> > > >> days. In case there are some comments, we can always discuss them
> over
> > > >> there.
> > > >>
> > > >> Thanks!
> > > >> Sagar.
> > > >>
> > >
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Hey Sagar, thanks for the KIP!

Just some cosmetic points to make it absolutely clear what this KIP is
doing:
1) could you clarify up front in the Motivation section that this is
focused on Kafka Streams applications, and not the plain Producer client?
2) you included the entire implementation of the `#send` method to
demonstrate the change in logic, but can you either remove the parts of
the implementation that aren't being touched here or at least highlight in
some way the specific lines that have changed?
3) In general the implementation is, well, an implementation detail that
doesn't need to be included in the KIP, but it's ok -- always nice to get a
sense of how things will work internally. But what I think would be more
useful to show in the KIP is how things will work with the new public
interface -- ie, can you provide a brief example of how a user would go
about taking advantage of this new interface? Even better, include an
example of what it takes for a user to accomplish this behavior before this
KIP. It would help showcase the concrete benefit this KIP is bringing and
anchor the motivation section a bit better.

Also nit: in the 2nd sentence under Public Interfaces I think it should say
"it would invoke the partition()  method" -- ie this should be
"partition()" not "partition*s*()"

Other than that this looks good, but I'll wait until you've addressed the
above to cast a vote.

Thanks!
Sophie

On Fri, Aug 12, 2022 at 10:36 PM Sagar <sa...@gmail.com> wrote:

> Hey John,
>
> Thanks for the vote. I added the reason for the rejection of the
> alternatives. The first one is basically an option to broadcast to all
> partitions which I felt was restrictive. Instead the KIP allows
> multicasting to 0-N partitions based upon the partitioner implementation.
>
> Thanks!
> Sagar.
>
> On Sat, Aug 13, 2022 at 7:35 AM John Roesler <vv...@apache.org> wrote:
>
> > Thanks, Sagar!
> >
> > I’m +1 (binding)
> >
> > Can you add a short explanation to each rejected alternative? I was
> > wondering why we wouldn’t provide an overloaded to()/addSink() (the first
> > rejected alternative), and I had to look back at the Streams code to see
> > that they both already accept the partitioner (I thought it was a
> config).
> >
> > Thanks!
> > -John
> >
> > On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> > > +1 (non binding)
> > >
> > > Walker
> > >
> > > On Tue, May 31, 2022 at 4:44 AM Sagar <sa...@gmail.com>
> wrote:
> > >
> > >> Hi All,
> > >>
> > >> I would like to start a voting thread on
> > >>
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> > >> .
> > >>
> > >> I am just starting this as the discussion thread has been open for 10+
> > >> days. In case there are some comments, we can always discuss them over
> > >> there.
> > >>
> > >> Thanks!
> > >> Sagar.
> > >>
> >
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Sagar <sa...@gmail.com>.
Hey John,

Thanks for the vote. I added the reason for the rejection of the
alternatives. The first one is basically an option to broadcast to all
partitions which I felt was restrictive. Instead the KIP allows
multicasting to 0-N partitions based upon the partitioner implementation.

Thanks!
Sagar.

On Sat, Aug 13, 2022 at 7:35 AM John Roesler <vv...@apache.org> wrote:

> Thanks, Sagar!
>
> I’m +1 (binding)
>
> Can you add a short explanation to each rejected alternative? I was
> wondering why we wouldn’t provide an overloaded to()/addSink() (the first
> rejected alternative), and I had to look back at the Streams code to see
> that they both already accept the partitioner (I thought it was a config).
>
> Thanks!
> -John
>
> On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> > +1 (non binding)
> >
> > Walker
> >
> > On Tue, May 31, 2022 at 4:44 AM Sagar <sa...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> I would like to start a voting thread on
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >> .
> >>
> >> I am just starting this as the discussion thread has been open for 10+
> >> days. In case there are some comments, we can always discuss them over
> >> there.
> >>
> >> Thanks!
> >> Sagar.
> >>
>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by John Roesler <vv...@apache.org>.
Thanks, Sagar!

I’m +1 (binding)

Can you add a short explanation to each rejected alternative? I was wondering why we wouldn’t provide an overloaded to()/addSink() (the first rejected alternative), and I had to look back at the Streams code to see that they both already accept the partitioner (I thought it was a config). 

Thanks!
-John

On Tue, Aug 9, 2022, at 13:44, Walker Carlson wrote:
> +1 (non binding)
>
> Walker
>
> On Tue, May 31, 2022 at 4:44 AM Sagar <sa...@gmail.com> wrote:
>
>> Hi All,
>>
>> I would like to start a voting thread on
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
>> .
>>
>> I am just starting this as the discussion thread has been open for 10+
>> days. In case there are some comments, we can always discuss them over
>> there.
>>
>> Thanks!
>> Sagar.
>>

Re: [VOTE] KIP-837 Allow MultiCasting a Result Record.

Posted by Walker Carlson <wc...@confluent.io.INVALID>.
+1 (non binding)

Walker

On Tue, May 31, 2022 at 4:44 AM Sagar <sa...@gmail.com> wrote:

> Hi All,
>
> I would like to start a voting thread on
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> .
>
> I am just starting this as the discussion thread has been open for 10+
> days. In case there are some comments, we can always discuss them over
> there.
>
> Thanks!
> Sagar.
>