You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by satyanarayan komandur <ks...@gmail.com> on 2020/09/10 13:40:02 UTC

KIP-669: Preserve Source Partition in Kafka Streams from context

Hi,

I have submitted a new KIP for preserving processor record context partition from source. I am looking for suggestions/comments. 

In most use cases where source message is getting transformed and sent to a target topic, where
1. number of partitions on source and sink topic are same
2. there is no change to the key
3. more importantly if we would like to preserve the partition as is without re-deriving using partition from context would help.

I am aware of one caveat where record processor context partition is not known in stream punctuation.

Please look over the KIP and chime in more ideas

Thanks
Balan



Re: KIP-669: Preserve Source Partition in Kafka Streams from context

Posted by Balan k <ks...@gmail.com>.
Thanks for taking time to reply.
I thought about the confusion with overloads. creating another Partitioner seemed liked a good idea to start with, soon i realized the partitioner is interface written in a way which does not support anything other than key/value. It seems to me the idea of using RecordContext is a better option as suggested by Matthias. That would also open up the options for punctuate as well. I will look into 478 further and see if we can merge

On 2020/09/10 22:59:09, Sophie Blee-Goldman <so...@confluent.io> wrote: 
> Hey Balan, thanks for the KIP!
> 
> The motivation here makes sense to me, but I have a few questions about the
> proposed API
> 
> I guess the main thing to point out is that if we just add new addSink()
> overloads to Topology,
> then only the lower level Processor API will benefit and users of the DSL
> won't be able to utilize
> this. This seems like a useful feature that we should make available to
> anyone.
> 
> We could follow a similar approach and add new toStream overloads to the
> KStream class, but
> that would expand the surface area of the API pretty significantly. The
> additional addSink()
> overloads alone would do this. The addSink() methods already have a pretty
> large number
> of optional parameters which means more and more overloads every time a new
> one is added.
> We should avoid making this problem worse wherever possible.
> 
> Existing StreamPartitioner  in SinkNode will be made null when context
> > partition is enabled
> 
> 
> This line from your KIP gave me some idea that it might be avoidable in
> this case. The implication
> of this quote is that the StreamPartitioner and useContextPartition
> parameter are inherently
> incompatible since they are two ways of specifying the same thing, the
> target partition. Well, if
> that's the case, then we should be able to just leverage the existing
> StreamPartitioner in some
> way to specify that we want records to end up in the source partition,
> without introducing a new
> parameter.
> 
> One option would be to just let users pass in a null StreamPartitioner to
> mean that it should
> use the source partition, but that seems a bit subtle. Maybe a better API
> would be to offer
> a new out-of-the-box StreamPartitioner called SourceContextPartitioner (or
> something),
> and then users just have to pass in an instance of this. WDYT?
> 
> On Thu, Sep 10, 2020 at 8:00 AM Balan k <ks...@gmail.com> wrote:
> 
> >
> > Forgot to add the link
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-669%3A+Preserve+Source+Partition+in+Kafka+Streams+from+context
> >
> >
> >
> > On 2020/09/10 13:40:02, satyanarayan komandur <ks...@gmail.com>
> > wrote:
> > > Hi,
> > >
> > > I have submitted a new KIP for preserving processor record context
> > partition from source. I am looking for suggestions/comments.
> > >
> > > In most use cases where source message is getting transformed and sent
> > to a target topic, where
> > > 1. number of partitions on source and sink topic are same
> > > 2. there is no change to the key
> > > 3. more importantly if we would like to preserve the partition as is
> > without re-deriving using partition from context would help.
> > >
> > > I am aware of one caveat where record processor context partition is not
> > known in stream punctuation.
> > >
> > > Please look over the KIP and chime in more ideas
> > >
> > > Thanks
> > > Balan
> > >
> > >
> > >
> >
> 

Re: KIP-669: Preserve Source Partition in Kafka Streams from context

Posted by Balan k <ks...@gmail.com>.
Thanks Matthias for the reply
I think i like the idea of the ability to use Record context in the default partitioner itself. I will join the discussion for KIP 478 to understand the context.

On 2020/09/11 21:31:46, "Matthias J. Sax" <mj...@apache.org> wrote: 
> With regard to KIP-478, there is the idea to introduce a `RecordContext`
> class.
> 
> Thus, we could just change the `StreamPartitioner` to take this new
> class as parameter to `partition()`? This might actually kill two birds
> with one stone, because I could imagine use cases in which users want to
> partition based on header information that is currently not exposed either.
> 
> For this case, we don't even need to provide any default implementation
> of `StreamPartitioner` but users can just implement it themselves. The
> use case itself makes sense, but it does not seem to be generic enough
> that we need to provide an out-of-the-box implementation for it.
> 
> 
> -Matthias
> 
> On 9/10/20 3:59 PM, Sophie Blee-Goldman wrote:
> > Hey Balan, thanks for the KIP!
> > 
> > The motivation here makes sense to me, but I have a few questions about the
> > proposed API
> > 
> > I guess the main thing to point out is that if we just add new addSink()
> > overloads to Topology,
> > then only the lower level Processor API will benefit and users of the DSL
> > won't be able to utilize
> > this. This seems like a useful feature that we should make available to
> > anyone.
> > 
> > We could follow a similar approach and add new toStream overloads to the
> > KStream class, but
> > that would expand the surface area of the API pretty significantly. The
> > additional addSink()
> > overloads alone would do this. The addSink() methods already have a pretty
> > large number
> > of optional parameters which means more and more overloads every time a new
> > one is added.
> > We should avoid making this problem worse wherever possible.
> > 
> > Existing StreamPartitioner  in SinkNode will be made null when context
> >> partition is enabled
> > 
> > 
> > This line from your KIP gave me some idea that it might be avoidable in
> > this case. The implication
> > of this quote is that the StreamPartitioner and useContextPartition
> > parameter are inherently
> > incompatible since they are two ways of specifying the same thing, the
> > target partition. Well, if
> > that's the case, then we should be able to just leverage the existing
> > StreamPartitioner in some
> > way to specify that we want records to end up in the source partition,
> > without introducing a new
> > parameter.
> > 
> > One option would be to just let users pass in a null StreamPartitioner to
> > mean that it should
> > use the source partition, but that seems a bit subtle. Maybe a better API
> > would be to offer
> > a new out-of-the-box StreamPartitioner called SourceContextPartitioner (or
> > something),
> > and then users just have to pass in an instance of this. WDYT?
> > 
> > On Thu, Sep 10, 2020 at 8:00 AM Balan k <ks...@gmail.com> wrote:
> > 
> >>
> >> Forgot to add the link
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-669%3A+Preserve+Source+Partition+in+Kafka+Streams+from+context
> >>
> >>
> >>
> >> On 2020/09/10 13:40:02, satyanarayan komandur <ks...@gmail.com>
> >> wrote:
> >>> Hi,
> >>>
> >>> I have submitted a new KIP for preserving processor record context
> >> partition from source. I am looking for suggestions/comments.
> >>>
> >>> In most use cases where source message is getting transformed and sent
> >> to a target topic, where
> >>> 1. number of partitions on source and sink topic are same
> >>> 2. there is no change to the key
> >>> 3. more importantly if we would like to preserve the partition as is
> >> without re-deriving using partition from context would help.
> >>>
> >>> I am aware of one caveat where record processor context partition is not
> >> known in stream punctuation.
> >>>
> >>> Please look over the KIP and chime in more ideas
> >>>
> >>> Thanks
> >>> Balan
> >>>
> >>>
> >>>
> >>
> > 
> 
> 

Re: KIP-669: Preserve Source Partition in Kafka Streams from context

Posted by "Matthias J. Sax" <mj...@apache.org>.
With regard to KIP-478, there is the idea to introduce a `RecordContext`
class.

Thus, we could just change the `StreamPartitioner` to take this new
class as parameter to `partition()`? This might actually kill two birds
with one stone, because I could imagine use cases in which users want to
partition based on header information that is currently not exposed either.

For this case, we don't even need to provide any default implementation
of `StreamPartitioner` but users can just implement it themselves. The
use case itself makes sense, but it does not seem to be generic enough
that we need to provide an out-of-the-box implementation for it.


-Matthias

On 9/10/20 3:59 PM, Sophie Blee-Goldman wrote:
> Hey Balan, thanks for the KIP!
> 
> The motivation here makes sense to me, but I have a few questions about the
> proposed API
> 
> I guess the main thing to point out is that if we just add new addSink()
> overloads to Topology,
> then only the lower level Processor API will benefit and users of the DSL
> won't be able to utilize
> this. This seems like a useful feature that we should make available to
> anyone.
> 
> We could follow a similar approach and add new toStream overloads to the
> KStream class, but
> that would expand the surface area of the API pretty significantly. The
> additional addSink()
> overloads alone would do this. The addSink() methods already have a pretty
> large number
> of optional parameters which means more and more overloads every time a new
> one is added.
> We should avoid making this problem worse wherever possible.
> 
> Existing StreamPartitioner  in SinkNode will be made null when context
>> partition is enabled
> 
> 
> This line from your KIP gave me some idea that it might be avoidable in
> this case. The implication
> of this quote is that the StreamPartitioner and useContextPartition
> parameter are inherently
> incompatible since they are two ways of specifying the same thing, the
> target partition. Well, if
> that's the case, then we should be able to just leverage the existing
> StreamPartitioner in some
> way to specify that we want records to end up in the source partition,
> without introducing a new
> parameter.
> 
> One option would be to just let users pass in a null StreamPartitioner to
> mean that it should
> use the source partition, but that seems a bit subtle. Maybe a better API
> would be to offer
> a new out-of-the-box StreamPartitioner called SourceContextPartitioner (or
> something),
> and then users just have to pass in an instance of this. WDYT?
> 
> On Thu, Sep 10, 2020 at 8:00 AM Balan k <ks...@gmail.com> wrote:
> 
>>
>> Forgot to add the link
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-669%3A+Preserve+Source+Partition+in+Kafka+Streams+from+context
>>
>>
>>
>> On 2020/09/10 13:40:02, satyanarayan komandur <ks...@gmail.com>
>> wrote:
>>> Hi,
>>>
>>> I have submitted a new KIP for preserving processor record context
>> partition from source. I am looking for suggestions/comments.
>>>
>>> In most use cases where source message is getting transformed and sent
>> to a target topic, where
>>> 1. number of partitions on source and sink topic are same
>>> 2. there is no change to the key
>>> 3. more importantly if we would like to preserve the partition as is
>> without re-deriving using partition from context would help.
>>>
>>> I am aware of one caveat where record processor context partition is not
>> known in stream punctuation.
>>>
>>> Please look over the KIP and chime in more ideas
>>>
>>> Thanks
>>> Balan
>>>
>>>
>>>
>>
> 


Re: KIP-669: Preserve Source Partition in Kafka Streams from context

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Hey Balan, thanks for the KIP!

The motivation here makes sense to me, but I have a few questions about the
proposed API

I guess the main thing to point out is that if we just add new addSink()
overloads to Topology,
then only the lower level Processor API will benefit and users of the DSL
won't be able to utilize
this. This seems like a useful feature that we should make available to
anyone.

We could follow a similar approach and add new toStream overloads to the
KStream class, but
that would expand the surface area of the API pretty significantly. The
additional addSink()
overloads alone would do this. The addSink() methods already have a pretty
large number
of optional parameters which means more and more overloads every time a new
one is added.
We should avoid making this problem worse wherever possible.

Existing StreamPartitioner  in SinkNode will be made null when context
> partition is enabled


This line from your KIP gave me some idea that it might be avoidable in
this case. The implication
of this quote is that the StreamPartitioner and useContextPartition
parameter are inherently
incompatible since they are two ways of specifying the same thing, the
target partition. Well, if
that's the case, then we should be able to just leverage the existing
StreamPartitioner in some
way to specify that we want records to end up in the source partition,
without introducing a new
parameter.

One option would be to just let users pass in a null StreamPartitioner to
mean that it should
use the source partition, but that seems a bit subtle. Maybe a better API
would be to offer
a new out-of-the-box StreamPartitioner called SourceContextPartitioner (or
something),
and then users just have to pass in an instance of this. WDYT?

On Thu, Sep 10, 2020 at 8:00 AM Balan k <ks...@gmail.com> wrote:

>
> Forgot to add the link
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-669%3A+Preserve+Source+Partition+in+Kafka+Streams+from+context
>
>
>
> On 2020/09/10 13:40:02, satyanarayan komandur <ks...@gmail.com>
> wrote:
> > Hi,
> >
> > I have submitted a new KIP for preserving processor record context
> partition from source. I am looking for suggestions/comments.
> >
> > In most use cases where source message is getting transformed and sent
> to a target topic, where
> > 1. number of partitions on source and sink topic are same
> > 2. there is no change to the key
> > 3. more importantly if we would like to preserve the partition as is
> without re-deriving using partition from context would help.
> >
> > I am aware of one caveat where record processor context partition is not
> known in stream punctuation.
> >
> > Please look over the KIP and chime in more ideas
> >
> > Thanks
> > Balan
> >
> >
> >
>

Re: KIP-669: Preserve Source Partition in Kafka Streams from context

Posted by Balan k <ks...@gmail.com>.
Forgot to add the link
https://cwiki.apache.org/confluence/display/KAFKA/KIP-669%3A+Preserve+Source+Partition+in+Kafka+Streams+from+context



On 2020/09/10 13:40:02, satyanarayan komandur <ks...@gmail.com> wrote: 
> Hi,
> 
> I have submitted a new KIP for preserving processor record context partition from source. I am looking for suggestions/comments. 
> 
> In most use cases where source message is getting transformed and sent to a target topic, where
> 1. number of partitions on source and sink topic are same
> 2. there is no change to the key
> 3. more importantly if we would like to preserve the partition as is without re-deriving using partition from context would help.
> 
> I am aware of one caveat where record processor context partition is not known in stream punctuation.
> 
> Please look over the KIP and chime in more ideas
> 
> Thanks
> Balan
> 
> 
>