You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jorge Esteban Quilcate Otoya <qu...@gmail.com> on 2020/07/16 23:05:09 UTC

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Hi everyone,

Bumping this thread to check if there's any feedback.

Cheers,
Jorge.

On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> Hi everyone,
>
> I would like to start the discussion for KIP-634:https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>
> Looking forward to your feedback.
>
> Thanks!
> Jorge.
>
>
>
>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Hi all,

With the acceptance of KIP-820 which will enable easier access to the
Record's metadata and headers and the potential design of a new version of
the DSL, I will set this KIP as inactive/dormant for the time being.

Thanks, everyone for the great discussions!
Jorge.

On Wed, 16 Feb 2022 at 00:08, Matthias J. Sax <mj...@apache.org> wrote:

> Sorry for playing devil's advocate, but do we really think it's a good
> way to design the API? To me, if feels like a cumbersome workaround.
>
> Personally, I believe that we are hitting a point for the DSL that
> requires a redesign from scratch. When the DSL was designed 5 years ago,
> record timestamp was just newly added (and did not play a significant
> role yet), and there was no record headers. That's why we have a
> kv-based model with `KStream<K,V>` and `KTable<K,V>` types.
>
> Given the changes in Kafka (and Kafka Streams) that accumulated over the
> last 5 years, it seems that a better API would be to have a
> KStream<Record<K,V>>` and `KTable<Record<K,V>>` model.
>
> I also think that we should clearly separate (modifiable) data (key,
> value, timestamp, headers) from (immutable) meta-data (topic name,
> partition, offset). And to go one step further, I am not even sure it
> meta-data makes much sense for non-source records (e.g., `offset` is
> more or less useless after a flatMap(), aggregation() -- similar for
> topic and partition after a join()). It would make sense to me, to take
> considerations like this into account designing the API.
>
> I would propose to not move forward with a "hacky design" (sorry for the
> strong terminology...) but to starting a DSL 2.0 discussion to come up
> with a long term sensible re-design. Of course, it would be a much
> larger effort, and might not provide a short term fix.
>
> A DSL 2.0 would not only allow us to add support for headers, but also
> to fix many other issue in the DSL that are not easily fixed without
> breaking compatibility (one example is KIP-300:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-300%3A+Add+Windowed+KTable+API+in+StreamsBuilder
> )
>
> Thoughts?
>
>
> -Matthias
>
> On 2/15/22 11:55 AM, John Roesler wrote:
> > Thanks for the update, Jorge,
> >
> > I've just looked over the KIP again. Just one more small
> > concern:
> >
> > 5) We can't just change the type of Record#headers() to a
> > new fully qualified type. That would be a source-
> > incompatible breaking change for users.
> >
> > Out options are:
> > * Deprecate the existing method and create a new one with
> > the new type
> > * If the existing Headers is "not great but ok", then maybe
> > we leave it alone.
> >
> > Thanks,
> > -John
> >
> >
> > On Fri, 2022-02-11 at 20:40 +0000, Jorge Esteban Quilcate
> > Otoya wrote:
> >> John and team,
> >>
> >> The following changes have been applied to the KIP following your
> feedback:
> >>
> >> - Leverage `Record<K, V>` instead of introducing a new type
> >> (`RecordValue<V>`).
> >> - `RecordSerde<K, V>` for stateful operations using `Record<K, V>` as
> value.
> >> - Extend `Record<K, V>` to:
> >>    - Implement `RecordMetadata` to expose `topic`, `partition`, and
> `offset`
> >>    - Use `Headers` abstraction introduce on this KIP instead of core one
> >>
> >> KIP:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> >>
> >> Looking forward to your feedback.
> >>
> >> Have a great weekend!
> >>
> >> On Thu, 10 Feb 2022 at 13:15, Jorge Esteban Quilcate Otoya <
> >> quilcate.jorge@gmail.com> wrote:
> >>
> >>>> What do you think about instead adding topic and
> >>> partition to Record?
> >>>
> >>> This is a very interesting idea. Forgot to consider this addition from
> >>> KIP-478.
> >>>
> >>> `Record` would also require `offset`. Maybe implementing
> `RecordMetadata`
> >>> and adding these fields as part of the constructor to keep them
> immutable
> >>> in comparison to the other fields?
> >>> It would also need to change `Record`'s headers type to the new one
> >>> proposed on this KIP.
> >>>
> >>> Let me explore this approach in more detail and update the KIP.
> >>>
> >>>> I find the name "mapRecordValue" to be a bit confusing
> >>>    because it seems like it would map the value of a record.
> >>>    What do you think about "mapValueToRecord" instead?
> >>>
> >>> Agree. It will depend on how we solve 1). If we end up using `Record`
> then
> >>> `mapValueToRecord` will make even more sense.
> >>>
> >>>> I agree with adding the serde explicitly. However, it
> >>> would be good to state whether and when we'll automatically
> >>> wrap a value serde. For example, if the value serde is known
> >>> (or if we're using a default serde from the config), will
> >>> Streams automatically wrap it downstream of the record-
> >>> mapping operator?
> >>>
> >>> Good point. The goal is as you describe it: only when
> `mapValueToRecord`
> >>> is called, the Serde will be implicitly applied.
> >>> Will make this explicit on the KIP.
> >>>
> >>>
> >>> On Wed, 9 Feb 2022 at 20:05, John Roesler <vv...@apache.org> wrote:
> >>>
> >>>> Hello Jorge,
> >>>>
> >>>> Thanks for bringing this up again!
> >>>>
> >>>> I've just read over the current version of the KIP.
> >>>>
> >>>> 1) I wonder if we really need RecordValue, since we now have
> >>>> Record, and they are almost the same, both in API and in
> >>>> purpose. What do you think about instead adding topic and
> >>>> partition to Record?
> >>>>
> >>>> 2) I find the name "mapRecordValue" to be a bit confusing
> >>>> because it seems like it would map the value of a record.
> >>>> What do you think about "mapValueToRecord" instead?
> >>>>
> >>>> 3) I agree with adding the serde explicitly. However, it
> >>>> would be good to state whether and when we'll automatically
> >>>> wrap a value serde. For example, if the value serde is known
> >>>> (or if we're using a default serde from the config), will
> >>>> Streams automatically wrap it downstream of the record-
> >>>> mapping operator?
> >>>>
> >>>> Otherwise, your proposal looks good to me!
> >>>>
> >>>> Thanks,
> >>>> -John
> >>>>
> >>>> On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate
> >>>> Otoya wrote:
> >>>>> Hi Dev team,
> >>>>>
> >>>>> I'd like to revamp the KIP again:
> >>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> >>>>>
> >>>>> - Reference implementation is now using the latest `Processor` API
> from
> >>>>> KIP-478: https://github.com/apache/kafka/pull/10265/files for both
> >>>>> Processors backing changes on the KStream API.
> >>>>> - It is proposing to still extend `To` class for backwards
> >>>> compatibility.
> >>>>>
> >>>>> Looking forward to your feedback.
> >>>>>
> >>>>> Regards,
> >>>>> Jorge.
> >>>>>
> >>>>> On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
> >>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>
> >>>>>> Hi everyone!
> >>>>>>
> >>>>>> I'd like to revamp this KIP. I have made some significant changes on
> >>>> the
> >>>>>> scope:
> >>>>>> - Added `mapRecordValue` to map not only headers, but other record
> >>>>>> metadata: topic name, partition, offset, and timestamp into a new
> type
> >>>>>> `RecordValue<V>`.
> >>>>>> - Added a serde for `RecordValue` to support stateful operations.
> >>>>>> - Added `setRecordHeaders` to apply headers to record crossing the
> >>>> stream.
> >>>>>> - Added headers to `To` to update headers via `context.forward(k, v,
> >>>> to)`.
> >>>>>>
> >>>>>> New link:
> >>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> >>>>>>
> >>>>>> Looking forward to your feedback,
> >>>>>>
> >>>>>> Cheers and stay safe,
> >>>>>> Jorge.
> >>>>>>
> >>>>>> On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
> >>>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>>
> >>>>>>> Thanks Sophie! Haven't followed KIP-478 but sounds great.
> >>>>>>> I'll be happy to help on that migration to the new PAPI if it's
> >>>> still an
> >>>>>>> open issue. We can bump this KIP after that.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Jorge.
> >>>>>>>
> >>>>>>> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <
> >>>> sophie@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I *think* that the `To` Matthias was referring to was not
> >>>> KStream#to but
> >>>>>>>> the To class
> >>>>>>>> which is accepted as a possible parameter of
> >>>> ProcessorContext#forward
> >>>>>>>> (correct
> >>>>>>>> me if wrong).
> >>>>>>>>
> >>>>>>>> This was on the old ProcessorContext interface, which has now been
> >>>>>>>> replaced with
> >>>>>>>> the new api.ProcessorContext in KIP-478. In the new interface
> >>>> we've moved
> >>>>>>>> away
> >>>>>>>> from the forward signatures that accept a separate key or value or
> >>>>>>>> timestamp or To,
> >>>>>>>> and wrapped all of these into a single Record class. This new
> >>>> Record
> >>>>>>>> class
> >>>>>>>> has the
> >>>>>>>> headers as a field, so it seems like KIP-478 has happened to
> >>>> solve the
> >>>>>>>> lack
> >>>>>>>> of support
> >>>>>>>> for Headers in the PAPI along the way.
> >>>>>>>>
> >>>>>>>> This is all somewhat recent, and probably wasn't yet sorted out
> >>>> at the
> >>>>>>>> time
> >>>>>>>> of Matthias'
> >>>>>>>> last reply. But given how this worked out it seems like we can
> >>>> just focus
> >>>>>>>> on adding
> >>>>>>>> support for Headers in the DSL in this KIP by building off of the
> >>>>>>>> groundwork of
> >>>>>>>> KIP-478? It doesn't seem necessary to go back and add support for
> >>>> headers
> >>>>>>>> in the old
> >>>>>>>> PAPI, since this will (or already has?) been deprecated.
> >>>>>>>>
> >>>>>>>> The one challenge is that this will presumably require that we
> >>>> migrate
> >>>>>>>> all
> >>>>>>>> DSL operators
> >>>>>>>> to the new PAPI before adding header support for those operators.
> >>>> But
> >>>>>>>> that
> >>>>>>>> definitely
> >>>>>>>> sounds achievable here
> >>>>>>>>
> >>>>>>>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
> >>>>>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>> Sorry for the late reply.
> >>>>>>>>>
> >>>>>>>>> I like the proposal. Just to check if I got it right:
> >>>>>>>>>
> >>>>>>>>> We can extend the `kstream.to()` function to support setting
> >>>> headers.
> >>>>>>>>> e.g.:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>>      void to(final String topic,
> >>>>>>>>>              final Produced<K, V> produced,
> >>>>>>>>>              final HeadersExtractor<K, V> headersExtractor);
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>> where `HeadersExtractor`:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>> public interface HeadersExtractor<K, V> {
> >>>>>>>>>      Headers extract(final K key, final V value, final
> >>>> RecordContext
> >>>>>>>>> recordContext);
> >>>>>>>>> }
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>>   This would require to change `Topology#addSink()` to support
> >>>> this
> >>>>>>>>> extractor as well.
> >>>>>>>>>
> >>>>>>>>> If this is aligned with your proposal, I'm happy to add it to
> >>>> this KIP.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Jorge.
> >>>>>>>>>
> >>>>>>>>> On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <
> >>>> mjsax@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Jorge,
> >>>>>>>>>>
> >>>>>>>>>> thanks a lot for this KIP. Being able to modify headers is a
> >>>> very
> >>>>>>>>>> valuable feature.
> >>>>>>>>>>
> >>>>>>>>>> However, before we actually expose them in the DSL, I am
> >>>> wondering
> >>>>>>>> if we
> >>>>>>>>>> should improve how headers can be modified in the PAPI?
> >>>> Currently,
> >>>>>>>> it is
> >>>>>>>>>> possible but very clumsy to work with headers in the
> >>>> Processor API,
> >>>>>>>>>> because of two reasons:
> >>>>>>>>>>
> >>>>>>>>>>   (1) There is no default `Headers` implementation in the
> >>>> public API
> >>>>>>>>>>   (2) There is no explicit way to set headers for output
> >>>> records
> >>>>>>>>>>
> >>>>>>>>>> Currently, the input record headers are copied into the output
> >>>>>>>> records
> >>>>>>>>>> when `forward()` is called, however, it's not really a deep
> >>>> copy but
> >>>>>>>> we
> >>>>>>>>>> just copy the reference. This implies that one needs to work
> >>>> with a
> >>>>>>>>>> single mutable object that flows through multiple processors
> >>>> making
> >>>>>>>> it
> >>>>>>>>>> very error prone.
> >>>>>>>>>>
> >>>>>>>>>> Furthermore, if you want to emit multiple output records, and
> >>>> for
> >>>>>>>>>> example want to add two different headers to the output record
> >>>>>>>> (based on
> >>>>>>>>>> the same input headers), you would need to do something like
> >>>> this:
> >>>>>>>>>>
> >>>>>>>>>>    Headers h = context.headers();
> >>>>>>>>>>    h.add(...);
> >>>>>>>>>>    context.forward(...);
> >>>>>>>>>>    // remove the header you added for the first output record
> >>>>>>>>>>    h.remove(...);
> >>>>>>>>>>    h.add(...);
> >>>>>>>>>>    context.forward(...);
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Maybe we could extend `To` to allow passing in a new
> >>>> `Headers` object
> >>>>>>>>>> (or an `Iterable<Header>` similar to `ProducerRecord`)? We
> >>>> could
> >>>>>>>> either
> >>>>>>>>>> add it to your KIP or do a new KIP just for the PAPI.
> >>>>>>>>>>
> >>>>>>>>>> Thoughts?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> >>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>
> >>>>>>>>>>> Bumping this thread to check if there's any feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Jorge.
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate
> >>>> Otoya <
> >>>>>>>>>>> quilcate.jorge@gmail.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I would like to start the discussion for KIP-634:
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> >>>>>>>>>>>>
> >>>>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks!
> >>>>>>>>>>>> Jorge.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>>>
> >
>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by "Matthias J. Sax" <mj...@apache.org>.
Sorry for playing devil's advocate, but do we really think it's a good 
way to design the API? To me, if feels like a cumbersome workaround.

Personally, I believe that we are hitting a point for the DSL that 
requires a redesign from scratch. When the DSL was designed 5 years ago, 
record timestamp was just newly added (and did not play a significant 
role yet), and there was no record headers. That's why we have a 
kv-based model with `KStream<K,V>` and `KTable<K,V>` types.

Given the changes in Kafka (and Kafka Streams) that accumulated over the 
last 5 years, it seems that a better API would be to have a 
KStream<Record<K,V>>` and `KTable<Record<K,V>>` model.

I also think that we should clearly separate (modifiable) data (key, 
value, timestamp, headers) from (immutable) meta-data (topic name, 
partition, offset). And to go one step further, I am not even sure it 
meta-data makes much sense for non-source records (e.g., `offset` is 
more or less useless after a flatMap(), aggregation() -- similar for 
topic and partition after a join()). It would make sense to me, to take 
considerations like this into account designing the API.

I would propose to not move forward with a "hacky design" (sorry for the 
strong terminology...) but to starting a DSL 2.0 discussion to come up 
with a long term sensible re-design. Of course, it would be a much 
larger effort, and might not provide a short term fix.

A DSL 2.0 would not only allow us to add support for headers, but also 
to fix many other issue in the DSL that are not easily fixed without 
breaking compatibility (one example is KIP-300: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-300%3A+Add+Windowed+KTable+API+in+StreamsBuilder)

Thoughts?


-Matthias

On 2/15/22 11:55 AM, John Roesler wrote:
> Thanks for the update, Jorge,
> 
> I've just looked over the KIP again. Just one more small
> concern:
> 
> 5) We can't just change the type of Record#headers() to a
> new fully qualified type. That would be a source-
> incompatible breaking change for users.
> 
> Out options are:
> * Deprecate the existing method and create a new one with
> the new type
> * If the existing Headers is "not great but ok", then maybe
> we leave it alone.
> 
> Thanks,
> -John
> 
> 
> On Fri, 2022-02-11 at 20:40 +0000, Jorge Esteban Quilcate
> Otoya wrote:
>> John and team,
>>
>> The following changes have been applied to the KIP following your feedback:
>>
>> - Leverage `Record<K, V>` instead of introducing a new type
>> (`RecordValue<V>`).
>> - `RecordSerde<K, V>` for stateful operations using `Record<K, V>` as value.
>> - Extend `Record<K, V>` to:
>>    - Implement `RecordMetadata` to expose `topic`, `partition`, and `offset`
>>    - Use `Headers` abstraction introduce on this KIP instead of core one
>>
>> KIP:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
>>
>> Looking forward to your feedback.
>>
>> Have a great weekend!
>>
>> On Thu, 10 Feb 2022 at 13:15, Jorge Esteban Quilcate Otoya <
>> quilcate.jorge@gmail.com> wrote:
>>
>>>> What do you think about instead adding topic and
>>> partition to Record?
>>>
>>> This is a very interesting idea. Forgot to consider this addition from
>>> KIP-478.
>>>
>>> `Record` would also require `offset`. Maybe implementing `RecordMetadata`
>>> and adding these fields as part of the constructor to keep them immutable
>>> in comparison to the other fields?
>>> It would also need to change `Record`'s headers type to the new one
>>> proposed on this KIP.
>>>
>>> Let me explore this approach in more detail and update the KIP.
>>>
>>>> I find the name "mapRecordValue" to be a bit confusing
>>>    because it seems like it would map the value of a record.
>>>    What do you think about "mapValueToRecord" instead?
>>>
>>> Agree. It will depend on how we solve 1). If we end up using `Record` then
>>> `mapValueToRecord` will make even more sense.
>>>
>>>> I agree with adding the serde explicitly. However, it
>>> would be good to state whether and when we'll automatically
>>> wrap a value serde. For example, if the value serde is known
>>> (or if we're using a default serde from the config), will
>>> Streams automatically wrap it downstream of the record-
>>> mapping operator?
>>>
>>> Good point. The goal is as you describe it: only when `mapValueToRecord`
>>> is called, the Serde will be implicitly applied.
>>> Will make this explicit on the KIP.
>>>
>>>
>>> On Wed, 9 Feb 2022 at 20:05, John Roesler <vv...@apache.org> wrote:
>>>
>>>> Hello Jorge,
>>>>
>>>> Thanks for bringing this up again!
>>>>
>>>> I've just read over the current version of the KIP.
>>>>
>>>> 1) I wonder if we really need RecordValue, since we now have
>>>> Record, and they are almost the same, both in API and in
>>>> purpose. What do you think about instead adding topic and
>>>> partition to Record?
>>>>
>>>> 2) I find the name "mapRecordValue" to be a bit confusing
>>>> because it seems like it would map the value of a record.
>>>> What do you think about "mapValueToRecord" instead?
>>>>
>>>> 3) I agree with adding the serde explicitly. However, it
>>>> would be good to state whether and when we'll automatically
>>>> wrap a value serde. For example, if the value serde is known
>>>> (or if we're using a default serde from the config), will
>>>> Streams automatically wrap it downstream of the record-
>>>> mapping operator?
>>>>
>>>> Otherwise, your proposal looks good to me!
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate
>>>> Otoya wrote:
>>>>> Hi Dev team,
>>>>>
>>>>> I'd like to revamp the KIP again:
>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
>>>>>
>>>>> - Reference implementation is now using the latest `Processor` API from
>>>>> KIP-478: https://github.com/apache/kafka/pull/10265/files for both
>>>>> Processors backing changes on the KStream API.
>>>>> - It is proposing to still extend `To` class for backwards
>>>> compatibility.
>>>>>
>>>>> Looking forward to your feedback.
>>>>>
>>>>> Regards,
>>>>> Jorge.
>>>>>
>>>>> On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>
>>>>>> Hi everyone!
>>>>>>
>>>>>> I'd like to revamp this KIP. I have made some significant changes on
>>>> the
>>>>>> scope:
>>>>>> - Added `mapRecordValue` to map not only headers, but other record
>>>>>> metadata: topic name, partition, offset, and timestamp into a new type
>>>>>> `RecordValue<V>`.
>>>>>> - Added a serde for `RecordValue` to support stateful operations.
>>>>>> - Added `setRecordHeaders` to apply headers to record crossing the
>>>> stream.
>>>>>> - Added headers to `To` to update headers via `context.forward(k, v,
>>>> to)`.
>>>>>>
>>>>>> New link:
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
>>>>>>
>>>>>> Looking forward to your feedback,
>>>>>>
>>>>>> Cheers and stay safe,
>>>>>> Jorge.
>>>>>>
>>>>>> On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
>>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks Sophie! Haven't followed KIP-478 but sounds great.
>>>>>>> I'll be happy to help on that migration to the new PAPI if it's
>>>> still an
>>>>>>> open issue. We can bump this KIP after that.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Jorge.
>>>>>>>
>>>>>>> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <
>>>> sophie@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I *think* that the `To` Matthias was referring to was not
>>>> KStream#to but
>>>>>>>> the To class
>>>>>>>> which is accepted as a possible parameter of
>>>> ProcessorContext#forward
>>>>>>>> (correct
>>>>>>>> me if wrong).
>>>>>>>>
>>>>>>>> This was on the old ProcessorContext interface, which has now been
>>>>>>>> replaced with
>>>>>>>> the new api.ProcessorContext in KIP-478. In the new interface
>>>> we've moved
>>>>>>>> away
>>>>>>>> from the forward signatures that accept a separate key or value or
>>>>>>>> timestamp or To,
>>>>>>>> and wrapped all of these into a single Record class. This new
>>>> Record
>>>>>>>> class
>>>>>>>> has the
>>>>>>>> headers as a field, so it seems like KIP-478 has happened to
>>>> solve the
>>>>>>>> lack
>>>>>>>> of support
>>>>>>>> for Headers in the PAPI along the way.
>>>>>>>>
>>>>>>>> This is all somewhat recent, and probably wasn't yet sorted out
>>>> at the
>>>>>>>> time
>>>>>>>> of Matthias'
>>>>>>>> last reply. But given how this worked out it seems like we can
>>>> just focus
>>>>>>>> on adding
>>>>>>>> support for Headers in the DSL in this KIP by building off of the
>>>>>>>> groundwork of
>>>>>>>> KIP-478? It doesn't seem necessary to go back and add support for
>>>> headers
>>>>>>>> in the old
>>>>>>>> PAPI, since this will (or already has?) been deprecated.
>>>>>>>>
>>>>>>>> The one challenge is that this will presumably require that we
>>>> migrate
>>>>>>>> all
>>>>>>>> DSL operators
>>>>>>>> to the new PAPI before adding header support for those operators.
>>>> But
>>>>>>>> that
>>>>>>>> definitely
>>>>>>>> sounds achievable here
>>>>>>>>
>>>>>>>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
>>>>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Matthias,
>>>>>>>>>
>>>>>>>>> Sorry for the late reply.
>>>>>>>>>
>>>>>>>>> I like the proposal. Just to check if I got it right:
>>>>>>>>>
>>>>>>>>> We can extend the `kstream.to()` function to support setting
>>>> headers.
>>>>>>>>> e.g.:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>>      void to(final String topic,
>>>>>>>>>              final Produced<K, V> produced,
>>>>>>>>>              final HeadersExtractor<K, V> headersExtractor);
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> where `HeadersExtractor`:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> public interface HeadersExtractor<K, V> {
>>>>>>>>>      Headers extract(final K key, final V value, final
>>>> RecordContext
>>>>>>>>> recordContext);
>>>>>>>>> }
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>>   This would require to change `Topology#addSink()` to support
>>>> this
>>>>>>>>> extractor as well.
>>>>>>>>>
>>>>>>>>> If this is aligned with your proposal, I'm happy to add it to
>>>> this KIP.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Jorge.
>>>>>>>>>
>>>>>>>>> On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <
>>>> mjsax@apache.org>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Jorge,
>>>>>>>>>>
>>>>>>>>>> thanks a lot for this KIP. Being able to modify headers is a
>>>> very
>>>>>>>>>> valuable feature.
>>>>>>>>>>
>>>>>>>>>> However, before we actually expose them in the DSL, I am
>>>> wondering
>>>>>>>> if we
>>>>>>>>>> should improve how headers can be modified in the PAPI?
>>>> Currently,
>>>>>>>> it is
>>>>>>>>>> possible but very clumsy to work with headers in the
>>>> Processor API,
>>>>>>>>>> because of two reasons:
>>>>>>>>>>
>>>>>>>>>>   (1) There is no default `Headers` implementation in the
>>>> public API
>>>>>>>>>>   (2) There is no explicit way to set headers for output
>>>> records
>>>>>>>>>>
>>>>>>>>>> Currently, the input record headers are copied into the output
>>>>>>>> records
>>>>>>>>>> when `forward()` is called, however, it's not really a deep
>>>> copy but
>>>>>>>> we
>>>>>>>>>> just copy the reference. This implies that one needs to work
>>>> with a
>>>>>>>>>> single mutable object that flows through multiple processors
>>>> making
>>>>>>>> it
>>>>>>>>>> very error prone.
>>>>>>>>>>
>>>>>>>>>> Furthermore, if you want to emit multiple output records, and
>>>> for
>>>>>>>>>> example want to add two different headers to the output record
>>>>>>>> (based on
>>>>>>>>>> the same input headers), you would need to do something like
>>>> this:
>>>>>>>>>>
>>>>>>>>>>    Headers h = context.headers();
>>>>>>>>>>    h.add(...);
>>>>>>>>>>    context.forward(...);
>>>>>>>>>>    // remove the header you added for the first output record
>>>>>>>>>>    h.remove(...);
>>>>>>>>>>    h.add(...);
>>>>>>>>>>    context.forward(...);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Maybe we could extend `To` to allow passing in a new
>>>> `Headers` object
>>>>>>>>>> (or an `Iterable<Header>` similar to `ProducerRecord`)? We
>>>> could
>>>>>>>> either
>>>>>>>>>> add it to your KIP or do a new KIP just for the PAPI.
>>>>>>>>>>
>>>>>>>>>> Thoughts?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>
>>>>>>>>>>> Bumping this thread to check if there's any feedback.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Jorge.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate
>>>> Otoya <
>>>>>>>>>>> quilcate.jorge@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>
>>>>>>>>>>>> I would like to start the discussion for KIP-634:
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>>>>>>>>>>>>
>>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> Jorge.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>
>>>>
> 

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by John Roesler <vv...@apache.org>.
Thanks for the update, Jorge,

I've just looked over the KIP again. Just one more small
concern:

5) We can't just change the type of Record#headers() to a
new fully qualified type. That would be a source-
incompatible breaking change for users.

Out options are:
* Deprecate the existing method and create a new one with
the new type
* If the existing Headers is "not great but ok", then maybe
we leave it alone.

Thanks,
-John


On Fri, 2022-02-11 at 20:40 +0000, Jorge Esteban Quilcate
Otoya wrote:
> John and team,
> 
> The following changes have been applied to the KIP following your feedback:
> 
> - Leverage `Record<K, V>` instead of introducing a new type
> (`RecordValue<V>`).
> - `RecordSerde<K, V>` for stateful operations using `Record<K, V>` as value.
> - Extend `Record<K, V>` to:
>   - Implement `RecordMetadata` to expose `topic`, `partition`, and `offset`
>   - Use `Headers` abstraction introduce on this KIP instead of core one
> 
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> 
> Looking forward to your feedback.
> 
> Have a great weekend!
> 
> On Thu, 10 Feb 2022 at 13:15, Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
> 
> > > What do you think about instead adding topic and
> > partition to Record?
> > 
> > This is a very interesting idea. Forgot to consider this addition from
> > KIP-478.
> > 
> > `Record` would also require `offset`. Maybe implementing `RecordMetadata`
> > and adding these fields as part of the constructor to keep them immutable
> > in comparison to the other fields?
> > It would also need to change `Record`'s headers type to the new one
> > proposed on this KIP.
> > 
> > Let me explore this approach in more detail and update the KIP.
> > 
> > > I find the name "mapRecordValue" to be a bit confusing
> >   because it seems like it would map the value of a record.
> >   What do you think about "mapValueToRecord" instead?
> > 
> > Agree. It will depend on how we solve 1). If we end up using `Record` then
> > `mapValueToRecord` will make even more sense.
> > 
> > > I agree with adding the serde explicitly. However, it
> > would be good to state whether and when we'll automatically
> > wrap a value serde. For example, if the value serde is known
> > (or if we're using a default serde from the config), will
> > Streams automatically wrap it downstream of the record-
> > mapping operator?
> > 
> > Good point. The goal is as you describe it: only when `mapValueToRecord`
> > is called, the Serde will be implicitly applied.
> > Will make this explicit on the KIP.
> > 
> > 
> > On Wed, 9 Feb 2022 at 20:05, John Roesler <vv...@apache.org> wrote:
> > 
> > > Hello Jorge,
> > > 
> > > Thanks for bringing this up again!
> > > 
> > > I've just read over the current version of the KIP.
> > > 
> > > 1) I wonder if we really need RecordValue, since we now have
> > > Record, and they are almost the same, both in API and in
> > > purpose. What do you think about instead adding topic and
> > > partition to Record?
> > > 
> > > 2) I find the name "mapRecordValue" to be a bit confusing
> > > because it seems like it would map the value of a record.
> > > What do you think about "mapValueToRecord" instead?
> > > 
> > > 3) I agree with adding the serde explicitly. However, it
> > > would be good to state whether and when we'll automatically
> > > wrap a value serde. For example, if the value serde is known
> > > (or if we're using a default serde from the config), will
> > > Streams automatically wrap it downstream of the record-
> > > mapping operator?
> > > 
> > > Otherwise, your proposal looks good to me!
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate
> > > Otoya wrote:
> > > > Hi Dev team,
> > > > 
> > > > I'd like to revamp the KIP again:
> > > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> > > > 
> > > > - Reference implementation is now using the latest `Processor` API from
> > > > KIP-478: https://github.com/apache/kafka/pull/10265/files for both
> > > > Processors backing changes on the KStream API.
> > > > - It is proposing to still extend `To` class for backwards
> > > compatibility.
> > > > 
> > > > Looking forward to your feedback.
> > > > 
> > > > Regards,
> > > > Jorge.
> > > > 
> > > > On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
> > > > quilcate.jorge@gmail.com> wrote:
> > > > 
> > > > > Hi everyone!
> > > > > 
> > > > > I'd like to revamp this KIP. I have made some significant changes on
> > > the
> > > > > scope:
> > > > > - Added `mapRecordValue` to map not only headers, but other record
> > > > > metadata: topic name, partition, offset, and timestamp into a new type
> > > > > `RecordValue<V>`.
> > > > > - Added a serde for `RecordValue` to support stateful operations.
> > > > > - Added `setRecordHeaders` to apply headers to record crossing the
> > > stream.
> > > > > - Added headers to `To` to update headers via `context.forward(k, v,
> > > to)`.
> > > > > 
> > > > > New link:
> > > > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> > > > > 
> > > > > Looking forward to your feedback,
> > > > > 
> > > > > Cheers and stay safe,
> > > > > Jorge.
> > > > > 
> > > > > On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jorge@gmail.com> wrote:
> > > > > 
> > > > > > Thanks Sophie! Haven't followed KIP-478 but sounds great.
> > > > > > I'll be happy to help on that migration to the new PAPI if it's
> > > still an
> > > > > > open issue. We can bump this KIP after that.
> > > > > > 
> > > > > > Cheers,
> > > > > > Jorge.
> > > > > > 
> > > > > > On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <
> > > sophie@confluent.io>
> > > > > > wrote:
> > > > > > 
> > > > > > > I *think* that the `To` Matthias was referring to was not
> > > KStream#to but
> > > > > > > the To class
> > > > > > > which is accepted as a possible parameter of
> > > ProcessorContext#forward
> > > > > > > (correct
> > > > > > > me if wrong).
> > > > > > > 
> > > > > > > This was on the old ProcessorContext interface, which has now been
> > > > > > > replaced with
> > > > > > > the new api.ProcessorContext in KIP-478. In the new interface
> > > we've moved
> > > > > > > away
> > > > > > > from the forward signatures that accept a separate key or value or
> > > > > > > timestamp or To,
> > > > > > > and wrapped all of these into a single Record class. This new
> > > Record
> > > > > > > class
> > > > > > > has the
> > > > > > > headers as a field, so it seems like KIP-478 has happened to
> > > solve the
> > > > > > > lack
> > > > > > > of support
> > > > > > > for Headers in the PAPI along the way.
> > > > > > > 
> > > > > > > This is all somewhat recent, and probably wasn't yet sorted out
> > > at the
> > > > > > > time
> > > > > > > of Matthias'
> > > > > > > last reply. But given how this worked out it seems like we can
> > > just focus
> > > > > > > on adding
> > > > > > > support for Headers in the DSL in this KIP by building off of the
> > > > > > > groundwork of
> > > > > > > KIP-478? It doesn't seem necessary to go back and add support for
> > > headers
> > > > > > > in the old
> > > > > > > PAPI, since this will (or already has?) been deprecated.
> > > > > > > 
> > > > > > > The one challenge is that this will presumably require that we
> > > migrate
> > > > > > > all
> > > > > > > DSL operators
> > > > > > > to the new PAPI before adding header support for those operators.
> > > But
> > > > > > > that
> > > > > > > definitely
> > > > > > > sounds achievable here
> > > > > > > 
> > > > > > > On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
> > > > > > > quilcate.jorge@gmail.com> wrote:
> > > > > > > 
> > > > > > > > Hi Matthias,
> > > > > > > > 
> > > > > > > > Sorry for the late reply.
> > > > > > > > 
> > > > > > > > I like the proposal. Just to check if I got it right:
> > > > > > > > 
> > > > > > > > We can extend the `kstream.to()` function to support setting
> > > headers.
> > > > > > > > e.g.:
> > > > > > > > 
> > > > > > > > ```
> > > > > > > >     void to(final String topic,
> > > > > > > >             final Produced<K, V> produced,
> > > > > > > >             final HeadersExtractor<K, V> headersExtractor);
> > > > > > > > ```
> > > > > > > > 
> > > > > > > > where `HeadersExtractor`:
> > > > > > > > 
> > > > > > > > ```
> > > > > > > > public interface HeadersExtractor<K, V> {
> > > > > > > >     Headers extract(final K key, final V value, final
> > > RecordContext
> > > > > > > > recordContext);
> > > > > > > > }
> > > > > > > > ```
> > > > > > > > 
> > > > > > > >  This would require to change `Topology#addSink()` to support
> > > this
> > > > > > > > extractor as well.
> > > > > > > > 
> > > > > > > > If this is aligned with your proposal, I'm happy to add it to
> > > this KIP.
> > > > > > > > 
> > > > > > > > Cheers,
> > > > > > > > Jorge.
> > > > > > > > 
> > > > > > > > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <
> > > mjsax@apache.org>
> > > > > > > wrote:
> > > > > > > > 
> > > > > > > > > Jorge,
> > > > > > > > > 
> > > > > > > > > thanks a lot for this KIP. Being able to modify headers is a
> > > very
> > > > > > > > > valuable feature.
> > > > > > > > > 
> > > > > > > > > However, before we actually expose them in the DSL, I am
> > > wondering
> > > > > > > if we
> > > > > > > > > should improve how headers can be modified in the PAPI?
> > > Currently,
> > > > > > > it is
> > > > > > > > > possible but very clumsy to work with headers in the
> > > Processor API,
> > > > > > > > > because of two reasons:
> > > > > > > > > 
> > > > > > > > >  (1) There is no default `Headers` implementation in the
> > > public API
> > > > > > > > >  (2) There is no explicit way to set headers for output
> > > records
> > > > > > > > > 
> > > > > > > > > Currently, the input record headers are copied into the output
> > > > > > > records
> > > > > > > > > when `forward()` is called, however, it's not really a deep
> > > copy but
> > > > > > > we
> > > > > > > > > just copy the reference. This implies that one needs to work
> > > with a
> > > > > > > > > single mutable object that flows through multiple processors
> > > making
> > > > > > > it
> > > > > > > > > very error prone.
> > > > > > > > > 
> > > > > > > > > Furthermore, if you want to emit multiple output records, and
> > > for
> > > > > > > > > example want to add two different headers to the output record
> > > > > > > (based on
> > > > > > > > > the same input headers), you would need to do something like
> > > this:
> > > > > > > > > 
> > > > > > > > >   Headers h = context.headers();
> > > > > > > > >   h.add(...);
> > > > > > > > >   context.forward(...);
> > > > > > > > >   // remove the header you added for the first output record
> > > > > > > > >   h.remove(...);
> > > > > > > > >   h.add(...);
> > > > > > > > >   context.forward(...);
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > Maybe we could extend `To` to allow passing in a new
> > > `Headers` object
> > > > > > > > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We
> > > could
> > > > > > > either
> > > > > > > > > add it to your KIP or do a new KIP just for the PAPI.
> > > > > > > > > 
> > > > > > > > > Thoughts?
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > -Matthias
> > > > > > > > > 
> > > > > > > > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > > > > > Hi everyone,
> > > > > > > > > > 
> > > > > > > > > > Bumping this thread to check if there's any feedback.
> > > > > > > > > > 
> > > > > > > > > > Cheers,
> > > > > > > > > > Jorge.
> > > > > > > > > > 
> > > > > > > > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate
> > > Otoya <
> > > > > > > > > > quilcate.jorge@gmail.com> wrote:
> > > > > > > > > > 
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > > 
> > > > > > > > > > > I would like to start the discussion for KIP-634:
> > > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> > > > > > > > > > > 
> > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > 
> > > > > > > > > > > Thanks!
> > > > > > > > > > > Jorge.
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > > > > 
> > > 
> > > 


Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
John and team,

The following changes have been applied to the KIP following your feedback:

- Leverage `Record<K, V>` instead of introducing a new type
(`RecordValue<V>`).
- `RecordSerde<K, V>` for stateful operations using `Record<K, V>` as value.
- Extend `Record<K, V>` to:
  - Implement `RecordMetadata` to expose `topic`, `partition`, and `offset`
  - Use `Headers` abstraction introduce on this KIP instead of core one

KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL

Looking forward to your feedback.

Have a great weekend!

On Thu, 10 Feb 2022 at 13:15, Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> > What do you think about instead adding topic and
> partition to Record?
>
> This is a very interesting idea. Forgot to consider this addition from
> KIP-478.
>
> `Record` would also require `offset`. Maybe implementing `RecordMetadata`
> and adding these fields as part of the constructor to keep them immutable
> in comparison to the other fields?
> It would also need to change `Record`'s headers type to the new one
> proposed on this KIP.
>
> Let me explore this approach in more detail and update the KIP.
>
> > I find the name "mapRecordValue" to be a bit confusing
>   because it seems like it would map the value of a record.
>   What do you think about "mapValueToRecord" instead?
>
> Agree. It will depend on how we solve 1). If we end up using `Record` then
> `mapValueToRecord` will make even more sense.
>
> > I agree with adding the serde explicitly. However, it
> would be good to state whether and when we'll automatically
> wrap a value serde. For example, if the value serde is known
> (or if we're using a default serde from the config), will
> Streams automatically wrap it downstream of the record-
> mapping operator?
>
> Good point. The goal is as you describe it: only when `mapValueToRecord`
> is called, the Serde will be implicitly applied.
> Will make this explicit on the KIP.
>
>
> On Wed, 9 Feb 2022 at 20:05, John Roesler <vv...@apache.org> wrote:
>
>> Hello Jorge,
>>
>> Thanks for bringing this up again!
>>
>> I've just read over the current version of the KIP.
>>
>> 1) I wonder if we really need RecordValue, since we now have
>> Record, and they are almost the same, both in API and in
>> purpose. What do you think about instead adding topic and
>> partition to Record?
>>
>> 2) I find the name "mapRecordValue" to be a bit confusing
>> because it seems like it would map the value of a record.
>> What do you think about "mapValueToRecord" instead?
>>
>> 3) I agree with adding the serde explicitly. However, it
>> would be good to state whether and when we'll automatically
>> wrap a value serde. For example, if the value serde is known
>> (or if we're using a default serde from the config), will
>> Streams automatically wrap it downstream of the record-
>> mapping operator?
>>
>> Otherwise, your proposal looks good to me!
>>
>> Thanks,
>> -John
>>
>> On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate
>> Otoya wrote:
>> > Hi Dev team,
>> >
>> > I'd like to revamp the KIP again:
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
>> >
>> > - Reference implementation is now using the latest `Processor` API from
>> > KIP-478: https://github.com/apache/kafka/pull/10265/files for both
>> > Processors backing changes on the KStream API.
>> > - It is proposing to still extend `To` class for backwards
>> compatibility.
>> >
>> > Looking forward to your feedback.
>> >
>> > Regards,
>> > Jorge.
>> >
>> > On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
>> > quilcate.jorge@gmail.com> wrote:
>> >
>> > > Hi everyone!
>> > >
>> > > I'd like to revamp this KIP. I have made some significant changes on
>> the
>> > > scope:
>> > > - Added `mapRecordValue` to map not only headers, but other record
>> > > metadata: topic name, partition, offset, and timestamp into a new type
>> > > `RecordValue<V>`.
>> > > - Added a serde for `RecordValue` to support stateful operations.
>> > > - Added `setRecordHeaders` to apply headers to record crossing the
>> stream.
>> > > - Added headers to `To` to update headers via `context.forward(k, v,
>> to)`.
>> > >
>> > > New link:
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
>> > >
>> > > Looking forward to your feedback,
>> > >
>> > > Cheers and stay safe,
>> > > Jorge.
>> > >
>> > > On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
>> > > quilcate.jorge@gmail.com> wrote:
>> > >
>> > > > Thanks Sophie! Haven't followed KIP-478 but sounds great.
>> > > > I'll be happy to help on that migration to the new PAPI if it's
>> still an
>> > > > open issue. We can bump this KIP after that.
>> > > >
>> > > > Cheers,
>> > > > Jorge.
>> > > >
>> > > > On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <
>> sophie@confluent.io>
>> > > > wrote:
>> > > >
>> > > > > I *think* that the `To` Matthias was referring to was not
>> KStream#to but
>> > > > > the To class
>> > > > > which is accepted as a possible parameter of
>> ProcessorContext#forward
>> > > > > (correct
>> > > > > me if wrong).
>> > > > >
>> > > > > This was on the old ProcessorContext interface, which has now been
>> > > > > replaced with
>> > > > > the new api.ProcessorContext in KIP-478. In the new interface
>> we've moved
>> > > > > away
>> > > > > from the forward signatures that accept a separate key or value or
>> > > > > timestamp or To,
>> > > > > and wrapped all of these into a single Record class. This new
>> Record
>> > > > > class
>> > > > > has the
>> > > > > headers as a field, so it seems like KIP-478 has happened to
>> solve the
>> > > > > lack
>> > > > > of support
>> > > > > for Headers in the PAPI along the way.
>> > > > >
>> > > > > This is all somewhat recent, and probably wasn't yet sorted out
>> at the
>> > > > > time
>> > > > > of Matthias'
>> > > > > last reply. But given how this worked out it seems like we can
>> just focus
>> > > > > on adding
>> > > > > support for Headers in the DSL in this KIP by building off of the
>> > > > > groundwork of
>> > > > > KIP-478? It doesn't seem necessary to go back and add support for
>> headers
>> > > > > in the old
>> > > > > PAPI, since this will (or already has?) been deprecated.
>> > > > >
>> > > > > The one challenge is that this will presumably require that we
>> migrate
>> > > > > all
>> > > > > DSL operators
>> > > > > to the new PAPI before adding header support for those operators.
>> But
>> > > > > that
>> > > > > definitely
>> > > > > sounds achievable here
>> > > > >
>> > > > > On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
>> > > > > quilcate.jorge@gmail.com> wrote:
>> > > > >
>> > > > > > Hi Matthias,
>> > > > > >
>> > > > > > Sorry for the late reply.
>> > > > > >
>> > > > > > I like the proposal. Just to check if I got it right:
>> > > > > >
>> > > > > > We can extend the `kstream.to()` function to support setting
>> headers.
>> > > > > > e.g.:
>> > > > > >
>> > > > > > ```
>> > > > > >     void to(final String topic,
>> > > > > >             final Produced<K, V> produced,
>> > > > > >             final HeadersExtractor<K, V> headersExtractor);
>> > > > > > ```
>> > > > > >
>> > > > > > where `HeadersExtractor`:
>> > > > > >
>> > > > > > ```
>> > > > > > public interface HeadersExtractor<K, V> {
>> > > > > >     Headers extract(final K key, final V value, final
>> RecordContext
>> > > > > > recordContext);
>> > > > > > }
>> > > > > > ```
>> > > > > >
>> > > > > >  This would require to change `Topology#addSink()` to support
>> this
>> > > > > > extractor as well.
>> > > > > >
>> > > > > > If this is aligned with your proposal, I'm happy to add it to
>> this KIP.
>> > > > > >
>> > > > > > Cheers,
>> > > > > > Jorge.
>> > > > > >
>> > > > > > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <
>> mjsax@apache.org>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Jorge,
>> > > > > > >
>> > > > > > > thanks a lot for this KIP. Being able to modify headers is a
>> very
>> > > > > > > valuable feature.
>> > > > > > >
>> > > > > > > However, before we actually expose them in the DSL, I am
>> wondering
>> > > > > if we
>> > > > > > > should improve how headers can be modified in the PAPI?
>> Currently,
>> > > > > it is
>> > > > > > > possible but very clumsy to work with headers in the
>> Processor API,
>> > > > > > > because of two reasons:
>> > > > > > >
>> > > > > > >  (1) There is no default `Headers` implementation in the
>> public API
>> > > > > > >  (2) There is no explicit way to set headers for output
>> records
>> > > > > > >
>> > > > > > > Currently, the input record headers are copied into the output
>> > > > > records
>> > > > > > > when `forward()` is called, however, it's not really a deep
>> copy but
>> > > > > we
>> > > > > > > just copy the reference. This implies that one needs to work
>> with a
>> > > > > > > single mutable object that flows through multiple processors
>> making
>> > > > > it
>> > > > > > > very error prone.
>> > > > > > >
>> > > > > > > Furthermore, if you want to emit multiple output records, and
>> for
>> > > > > > > example want to add two different headers to the output record
>> > > > > (based on
>> > > > > > > the same input headers), you would need to do something like
>> this:
>> > > > > > >
>> > > > > > >   Headers h = context.headers();
>> > > > > > >   h.add(...);
>> > > > > > >   context.forward(...);
>> > > > > > >   // remove the header you added for the first output record
>> > > > > > >   h.remove(...);
>> > > > > > >   h.add(...);
>> > > > > > >   context.forward(...);
>> > > > > > >
>> > > > > > >
>> > > > > > > Maybe we could extend `To` to allow passing in a new
>> `Headers` object
>> > > > > > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We
>> could
>> > > > > either
>> > > > > > > add it to your KIP or do a new KIP just for the PAPI.
>> > > > > > >
>> > > > > > > Thoughts?
>> > > > > > >
>> > > > > > >
>> > > > > > > -Matthias
>> > > > > > >
>> > > > > > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
>> > > > > > > > Hi everyone,
>> > > > > > > >
>> > > > > > > > Bumping this thread to check if there's any feedback.
>> > > > > > > >
>> > > > > > > > Cheers,
>> > > > > > > > Jorge.
>> > > > > > > >
>> > > > > > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate
>> Otoya <
>> > > > > > > > quilcate.jorge@gmail.com> wrote:
>> > > > > > > >
>> > > > > > > > > Hi everyone,
>> > > > > > > > >
>> > > > > > > > > I would like to start the discussion for KIP-634:
>> > > > > > >
>> > > > > >
>> > > > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>> > > > > > > > >
>> > > > > > > > > Looking forward to your feedback.
>> > > > > > > > >
>> > > > > > > > > Thanks!
>> > > > > > > > > Jorge.
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>>
>>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
> What do you think about instead adding topic and
partition to Record?

This is a very interesting idea. Forgot to consider this addition from
KIP-478.

`Record` would also require `offset`. Maybe implementing `RecordMetadata`
and adding these fields as part of the constructor to keep them immutable
in comparison to the other fields?
It would also need to change `Record`'s headers type to the new one
proposed on this KIP.

Let me explore this approach in more detail and update the KIP.

> I find the name "mapRecordValue" to be a bit confusing
  because it seems like it would map the value of a record.
  What do you think about "mapValueToRecord" instead?

Agree. It will depend on how we solve 1). If we end up using `Record` then
`mapValueToRecord` will make even more sense.

> I agree with adding the serde explicitly. However, it
would be good to state whether and when we'll automatically
wrap a value serde. For example, if the value serde is known
(or if we're using a default serde from the config), will
Streams automatically wrap it downstream of the record-
mapping operator?

Good point. The goal is as you describe it: only when `mapValueToRecord` is
called, the Serde will be implicitly applied.
Will make this explicit on the KIP.


On Wed, 9 Feb 2022 at 20:05, John Roesler <vv...@apache.org> wrote:

> Hello Jorge,
>
> Thanks for bringing this up again!
>
> I've just read over the current version of the KIP.
>
> 1) I wonder if we really need RecordValue, since we now have
> Record, and they are almost the same, both in API and in
> purpose. What do you think about instead adding topic and
> partition to Record?
>
> 2) I find the name "mapRecordValue" to be a bit confusing
> because it seems like it would map the value of a record.
> What do you think about "mapValueToRecord" instead?
>
> 3) I agree with adding the serde explicitly. However, it
> would be good to state whether and when we'll automatically
> wrap a value serde. For example, if the value serde is known
> (or if we're using a default serde from the config), will
> Streams automatically wrap it downstream of the record-
> mapping operator?
>
> Otherwise, your proposal looks good to me!
>
> Thanks,
> -John
>
> On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate
> Otoya wrote:
> > Hi Dev team,
> >
> > I'd like to revamp the KIP again:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> >
> > - Reference implementation is now using the latest `Processor` API from
> > KIP-478: https://github.com/apache/kafka/pull/10265/files for both
> > Processors backing changes on the KStream API.
> > - It is proposing to still extend `To` class for backwards compatibility.
> >
> > Looking forward to your feedback.
> >
> > Regards,
> > Jorge.
> >
> > On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
> > quilcate.jorge@gmail.com> wrote:
> >
> > > Hi everyone!
> > >
> > > I'd like to revamp this KIP. I have made some significant changes on
> the
> > > scope:
> > > - Added `mapRecordValue` to map not only headers, but other record
> > > metadata: topic name, partition, offset, and timestamp into a new type
> > > `RecordValue<V>`.
> > > - Added a serde for `RecordValue` to support stateful operations.
> > > - Added `setRecordHeaders` to apply headers to record crossing the
> stream.
> > > - Added headers to `To` to update headers via `context.forward(k, v,
> to)`.
> > >
> > > New link:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> > >
> > > Looking forward to your feedback,
> > >
> > > Cheers and stay safe,
> > > Jorge.
> > >
> > > On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
> > > quilcate.jorge@gmail.com> wrote:
> > >
> > > > Thanks Sophie! Haven't followed KIP-478 but sounds great.
> > > > I'll be happy to help on that migration to the new PAPI if it's
> still an
> > > > open issue. We can bump this KIP after that.
> > > >
> > > > Cheers,
> > > > Jorge.
> > > >
> > > > On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <
> sophie@confluent.io>
> > > > wrote:
> > > >
> > > > > I *think* that the `To` Matthias was referring to was not
> KStream#to but
> > > > > the To class
> > > > > which is accepted as a possible parameter of
> ProcessorContext#forward
> > > > > (correct
> > > > > me if wrong).
> > > > >
> > > > > This was on the old ProcessorContext interface, which has now been
> > > > > replaced with
> > > > > the new api.ProcessorContext in KIP-478. In the new interface
> we've moved
> > > > > away
> > > > > from the forward signatures that accept a separate key or value or
> > > > > timestamp or To,
> > > > > and wrapped all of these into a single Record class. This new
> Record
> > > > > class
> > > > > has the
> > > > > headers as a field, so it seems like KIP-478 has happened to solve
> the
> > > > > lack
> > > > > of support
> > > > > for Headers in the PAPI along the way.
> > > > >
> > > > > This is all somewhat recent, and probably wasn't yet sorted out at
> the
> > > > > time
> > > > > of Matthias'
> > > > > last reply. But given how this worked out it seems like we can
> just focus
> > > > > on adding
> > > > > support for Headers in the DSL in this KIP by building off of the
> > > > > groundwork of
> > > > > KIP-478? It doesn't seem necessary to go back and add support for
> headers
> > > > > in the old
> > > > > PAPI, since this will (or already has?) been deprecated.
> > > > >
> > > > > The one challenge is that this will presumably require that we
> migrate
> > > > > all
> > > > > DSL operators
> > > > > to the new PAPI before adding header support for those operators.
> But
> > > > > that
> > > > > definitely
> > > > > sounds achievable here
> > > > >
> > > > > On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jorge@gmail.com> wrote:
> > > > >
> > > > > > Hi Matthias,
> > > > > >
> > > > > > Sorry for the late reply.
> > > > > >
> > > > > > I like the proposal. Just to check if I got it right:
> > > > > >
> > > > > > We can extend the `kstream.to()` function to support setting
> headers.
> > > > > > e.g.:
> > > > > >
> > > > > > ```
> > > > > >     void to(final String topic,
> > > > > >             final Produced<K, V> produced,
> > > > > >             final HeadersExtractor<K, V> headersExtractor);
> > > > > > ```
> > > > > >
> > > > > > where `HeadersExtractor`:
> > > > > >
> > > > > > ```
> > > > > > public interface HeadersExtractor<K, V> {
> > > > > >     Headers extract(final K key, final V value, final
> RecordContext
> > > > > > recordContext);
> > > > > > }
> > > > > > ```
> > > > > >
> > > > > >  This would require to change `Topology#addSink()` to support
> this
> > > > > > extractor as well.
> > > > > >
> > > > > > If this is aligned with your proposal, I'm happy to add it to
> this KIP.
> > > > > >
> > > > > > Cheers,
> > > > > > Jorge.
> > > > > >
> > > > > > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <
> mjsax@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Jorge,
> > > > > > >
> > > > > > > thanks a lot for this KIP. Being able to modify headers is a
> very
> > > > > > > valuable feature.
> > > > > > >
> > > > > > > However, before we actually expose them in the DSL, I am
> wondering
> > > > > if we
> > > > > > > should improve how headers can be modified in the PAPI?
> Currently,
> > > > > it is
> > > > > > > possible but very clumsy to work with headers in the Processor
> API,
> > > > > > > because of two reasons:
> > > > > > >
> > > > > > >  (1) There is no default `Headers` implementation in the
> public API
> > > > > > >  (2) There is no explicit way to set headers for output records
> > > > > > >
> > > > > > > Currently, the input record headers are copied into the output
> > > > > records
> > > > > > > when `forward()` is called, however, it's not really a deep
> copy but
> > > > > we
> > > > > > > just copy the reference. This implies that one needs to work
> with a
> > > > > > > single mutable object that flows through multiple processors
> making
> > > > > it
> > > > > > > very error prone.
> > > > > > >
> > > > > > > Furthermore, if you want to emit multiple output records, and
> for
> > > > > > > example want to add two different headers to the output record
> > > > > (based on
> > > > > > > the same input headers), you would need to do something like
> this:
> > > > > > >
> > > > > > >   Headers h = context.headers();
> > > > > > >   h.add(...);
> > > > > > >   context.forward(...);
> > > > > > >   // remove the header you added for the first output record
> > > > > > >   h.remove(...);
> > > > > > >   h.add(...);
> > > > > > >   context.forward(...);
> > > > > > >
> > > > > > >
> > > > > > > Maybe we could extend `To` to allow passing in a new `Headers`
> object
> > > > > > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We
> could
> > > > > either
> > > > > > > add it to your KIP or do a new KIP just for the PAPI.
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > Bumping this thread to check if there's any feedback.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Jorge.
> > > > > > > >
> > > > > > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate
> Otoya <
> > > > > > > > quilcate.jorge@gmail.com> wrote:
> > > > > > > >
> > > > > > > > > Hi everyone,
> > > > > > > > >
> > > > > > > > > I would like to start the discussion for KIP-634:
> > > > > > >
> > > > > >
> > > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> > > > > > > > >
> > > > > > > > > Looking forward to your feedback.
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Jorge.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
>
>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by John Roesler <vv...@apache.org>.
Hello Jorge,

Thanks for bringing this up again!

I've just read over the current version of the KIP.

1) I wonder if we really need RecordValue, since we now have
Record, and they are almost the same, both in API and in
purpose. What do you think about instead adding topic and
partition to Record?

2) I find the name "mapRecordValue" to be a bit confusing
because it seems like it would map the value of a record.
What do you think about "mapValueToRecord" instead?

3) I agree with adding the serde explicitly. However, it
would be good to state whether and when we'll automatically
wrap a value serde. For example, if the value serde is known
(or if we're using a default serde from the config), will
Streams automatically wrap it downstream of the record-
mapping operator?

Otherwise, your proposal looks good to me!

Thanks,
-John

On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate
Otoya wrote:
> Hi Dev team,
> 
> I'd like to revamp the KIP again:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> 
> - Reference implementation is now using the latest `Processor` API from
> KIP-478: https://github.com/apache/kafka/pull/10265/files for both
> Processors backing changes on the KStream API.
> - It is proposing to still extend `To` class for backwards compatibility.
> 
> Looking forward to your feedback.
> 
> Regards,
> Jorge.
> 
> On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
> 
> > Hi everyone!
> > 
> > I'd like to revamp this KIP. I have made some significant changes on the
> > scope:
> > - Added `mapRecordValue` to map not only headers, but other record
> > metadata: topic name, partition, offset, and timestamp into a new type
> > `RecordValue<V>`.
> > - Added a serde for `RecordValue` to support stateful operations.
> > - Added `setRecordHeaders` to apply headers to record crossing the stream.
> > - Added headers to `To` to update headers via `context.forward(k, v, to)`.
> > 
> > New link:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> > 
> > Looking forward to your feedback,
> > 
> > Cheers and stay safe,
> > Jorge.
> > 
> > On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
> > quilcate.jorge@gmail.com> wrote:
> > 
> > > Thanks Sophie! Haven't followed KIP-478 but sounds great.
> > > I'll be happy to help on that migration to the new PAPI if it's still an
> > > open issue. We can bump this KIP after that.
> > > 
> > > Cheers,
> > > Jorge.
> > > 
> > > On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <so...@confluent.io>
> > > wrote:
> > > 
> > > > I *think* that the `To` Matthias was referring to was not KStream#to but
> > > > the To class
> > > > which is accepted as a possible parameter of ProcessorContext#forward
> > > > (correct
> > > > me if wrong).
> > > > 
> > > > This was on the old ProcessorContext interface, which has now been
> > > > replaced with
> > > > the new api.ProcessorContext in KIP-478. In the new interface we've moved
> > > > away
> > > > from the forward signatures that accept a separate key or value or
> > > > timestamp or To,
> > > > and wrapped all of these into a single Record class. This new Record
> > > > class
> > > > has the
> > > > headers as a field, so it seems like KIP-478 has happened to solve the
> > > > lack
> > > > of support
> > > > for Headers in the PAPI along the way.
> > > > 
> > > > This is all somewhat recent, and probably wasn't yet sorted out at the
> > > > time
> > > > of Matthias'
> > > > last reply. But given how this worked out it seems like we can just focus
> > > > on adding
> > > > support for Headers in the DSL in this KIP by building off of the
> > > > groundwork of
> > > > KIP-478? It doesn't seem necessary to go back and add support for headers
> > > > in the old
> > > > PAPI, since this will (or already has?) been deprecated.
> > > > 
> > > > The one challenge is that this will presumably require that we migrate
> > > > all
> > > > DSL operators
> > > > to the new PAPI before adding header support for those operators. But
> > > > that
> > > > definitely
> > > > sounds achievable here
> > > > 
> > > > On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
> > > > quilcate.jorge@gmail.com> wrote:
> > > > 
> > > > > Hi Matthias,
> > > > > 
> > > > > Sorry for the late reply.
> > > > > 
> > > > > I like the proposal. Just to check if I got it right:
> > > > > 
> > > > > We can extend the `kstream.to()` function to support setting headers.
> > > > > e.g.:
> > > > > 
> > > > > ```
> > > > >     void to(final String topic,
> > > > >             final Produced<K, V> produced,
> > > > >             final HeadersExtractor<K, V> headersExtractor);
> > > > > ```
> > > > > 
> > > > > where `HeadersExtractor`:
> > > > > 
> > > > > ```
> > > > > public interface HeadersExtractor<K, V> {
> > > > >     Headers extract(final K key, final V value, final RecordContext
> > > > > recordContext);
> > > > > }
> > > > > ```
> > > > > 
> > > > >  This would require to change `Topology#addSink()` to support this
> > > > > extractor as well.
> > > > > 
> > > > > If this is aligned with your proposal, I'm happy to add it to this KIP.
> > > > > 
> > > > > Cheers,
> > > > > Jorge.
> > > > > 
> > > > > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org>
> > > > wrote:
> > > > > 
> > > > > > Jorge,
> > > > > > 
> > > > > > thanks a lot for this KIP. Being able to modify headers is a very
> > > > > > valuable feature.
> > > > > > 
> > > > > > However, before we actually expose them in the DSL, I am wondering
> > > > if we
> > > > > > should improve how headers can be modified in the PAPI? Currently,
> > > > it is
> > > > > > possible but very clumsy to work with headers in the Processor API,
> > > > > > because of two reasons:
> > > > > > 
> > > > > >  (1) There is no default `Headers` implementation in the public API
> > > > > >  (2) There is no explicit way to set headers for output records
> > > > > > 
> > > > > > Currently, the input record headers are copied into the output
> > > > records
> > > > > > when `forward()` is called, however, it's not really a deep copy but
> > > > we
> > > > > > just copy the reference. This implies that one needs to work with a
> > > > > > single mutable object that flows through multiple processors making
> > > > it
> > > > > > very error prone.
> > > > > > 
> > > > > > Furthermore, if you want to emit multiple output records, and for
> > > > > > example want to add two different headers to the output record
> > > > (based on
> > > > > > the same input headers), you would need to do something like this:
> > > > > > 
> > > > > >   Headers h = context.headers();
> > > > > >   h.add(...);
> > > > > >   context.forward(...);
> > > > > >   // remove the header you added for the first output record
> > > > > >   h.remove(...);
> > > > > >   h.add(...);
> > > > > >   context.forward(...);
> > > > > > 
> > > > > > 
> > > > > > Maybe we could extend `To` to allow passing in a new `Headers` object
> > > > > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could
> > > > either
> > > > > > add it to your KIP or do a new KIP just for the PAPI.
> > > > > > 
> > > > > > Thoughts?
> > > > > > 
> > > > > > 
> > > > > > -Matthias
> > > > > > 
> > > > > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > > Hi everyone,
> > > > > > > 
> > > > > > > Bumping this thread to check if there's any feedback.
> > > > > > > 
> > > > > > > Cheers,
> > > > > > > Jorge.
> > > > > > > 
> > > > > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> > > > > > > quilcate.jorge@gmail.com> wrote:
> > > > > > > 
> > > > > > > > Hi everyone,
> > > > > > > > 
> > > > > > > > I would like to start the discussion for KIP-634:
> > > > > > 
> > > > > 
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> > > > > > > > 
> > > > > > > > Looking forward to your feedback.
> > > > > > > > 
> > > > > > > > Thanks!
> > > > > > > > Jorge.
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > > > > 
> > > > > > 
> > > > > 
> > > > 
> > > 


Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Hi Dev team,

I'd like to revamp the KIP again:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL

- Reference implementation is now using the latest `Processor` API from
KIP-478: https://github.com/apache/kafka/pull/10265/files for both
Processors backing changes on the KStream API.
- It is proposing to still extend `To` class for backwards compatibility.

Looking forward to your feedback.

Regards,
Jorge.

On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> Hi everyone!
>
> I'd like to revamp this KIP. I have made some significant changes on the
> scope:
> - Added `mapRecordValue` to map not only headers, but other record
> metadata: topic name, partition, offset, and timestamp into a new type
> `RecordValue<V>`.
> - Added a serde for `RecordValue` to support stateful operations.
> - Added `setRecordHeaders` to apply headers to record crossing the stream.
> - Added headers to `To` to update headers via `context.forward(k, v, to)`.
>
> New link:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
>
> Looking forward to your feedback,
>
> Cheers and stay safe,
> Jorge.
>
> On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
>
>> Thanks Sophie! Haven't followed KIP-478 but sounds great.
>> I'll be happy to help on that migration to the new PAPI if it's still an
>> open issue. We can bump this KIP after that.
>>
>> Cheers,
>> Jorge.
>>
>> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <so...@confluent.io>
>> wrote:
>>
>>> I *think* that the `To` Matthias was referring to was not KStream#to but
>>> the To class
>>> which is accepted as a possible parameter of ProcessorContext#forward
>>> (correct
>>> me if wrong).
>>>
>>> This was on the old ProcessorContext interface, which has now been
>>> replaced with
>>> the new api.ProcessorContext in KIP-478. In the new interface we've moved
>>> away
>>> from the forward signatures that accept a separate key or value or
>>> timestamp or To,
>>> and wrapped all of these into a single Record class. This new Record
>>> class
>>> has the
>>> headers as a field, so it seems like KIP-478 has happened to solve the
>>> lack
>>> of support
>>> for Headers in the PAPI along the way.
>>>
>>> This is all somewhat recent, and probably wasn't yet sorted out at the
>>> time
>>> of Matthias'
>>> last reply. But given how this worked out it seems like we can just focus
>>> on adding
>>> support for Headers in the DSL in this KIP by building off of the
>>> groundwork of
>>> KIP-478? It doesn't seem necessary to go back and add support for headers
>>> in the old
>>> PAPI, since this will (or already has?) been deprecated.
>>>
>>> The one challenge is that this will presumably require that we migrate
>>> all
>>> DSL operators
>>> to the new PAPI before adding header support for those operators. But
>>> that
>>> definitely
>>> sounds achievable here
>>>
>>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
>>> quilcate.jorge@gmail.com> wrote:
>>>
>>> > Hi Matthias,
>>> >
>>> > Sorry for the late reply.
>>> >
>>> > I like the proposal. Just to check if I got it right:
>>> >
>>> > We can extend the `kstream.to()` function to support setting headers.
>>> > e.g.:
>>> >
>>> > ```
>>> >     void to(final String topic,
>>> >             final Produced<K, V> produced,
>>> >             final HeadersExtractor<K, V> headersExtractor);
>>> > ```
>>> >
>>> > where `HeadersExtractor`:
>>> >
>>> > ```
>>> > public interface HeadersExtractor<K, V> {
>>> >     Headers extract(final K key, final V value, final RecordContext
>>> > recordContext);
>>> > }
>>> > ```
>>> >
>>> >  This would require to change `Topology#addSink()` to support this
>>> > extractor as well.
>>> >
>>> > If this is aligned with your proposal, I'm happy to add it to this KIP.
>>> >
>>> > Cheers,
>>> > Jorge.
>>> >
>>> > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>> >
>>> > > Jorge,
>>> > >
>>> > > thanks a lot for this KIP. Being able to modify headers is a very
>>> > > valuable feature.
>>> > >
>>> > > However, before we actually expose them in the DSL, I am wondering
>>> if we
>>> > > should improve how headers can be modified in the PAPI? Currently,
>>> it is
>>> > > possible but very clumsy to work with headers in the Processor API,
>>> > > because of two reasons:
>>> > >
>>> > >  (1) There is no default `Headers` implementation in the public API
>>> > >  (2) There is no explicit way to set headers for output records
>>> > >
>>> > > Currently, the input record headers are copied into the output
>>> records
>>> > > when `forward()` is called, however, it's not really a deep copy but
>>> we
>>> > > just copy the reference. This implies that one needs to work with a
>>> > > single mutable object that flows through multiple processors making
>>> it
>>> > > very error prone.
>>> > >
>>> > > Furthermore, if you want to emit multiple output records, and for
>>> > > example want to add two different headers to the output record
>>> (based on
>>> > > the same input headers), you would need to do something like this:
>>> > >
>>> > >   Headers h = context.headers();
>>> > >   h.add(...);
>>> > >   context.forward(...);
>>> > >   // remove the header you added for the first output record
>>> > >   h.remove(...);
>>> > >   h.add(...);
>>> > >   context.forward(...);
>>> > >
>>> > >
>>> > > Maybe we could extend `To` to allow passing in a new `Headers` object
>>> > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could
>>> either
>>> > > add it to your KIP or do a new KIP just for the PAPI.
>>> > >
>>> > > Thoughts?
>>> > >
>>> > >
>>> > > -Matthias
>>> > >
>>> > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
>>> > > > Hi everyone,
>>> > > >
>>> > > > Bumping this thread to check if there's any feedback.
>>> > > >
>>> > > > Cheers,
>>> > > > Jorge.
>>> > > >
>>> > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
>>> > > > quilcate.jorge@gmail.com> wrote:
>>> > > >
>>> > > >> Hi everyone,
>>> > > >>
>>> > > >> I would like to start the discussion for KIP-634:
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>>> > > >>
>>> > > >> Looking forward to your feedback.
>>> > > >>
>>> > > >> Thanks!
>>> > > >> Jorge.
>>> > > >>
>>> > > >>
>>> > > >>
>>> > > >>
>>> > > >
>>> > >
>>> > >
>>> >
>>>
>>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Hi everyone!

I'd like to revamp this KIP. I have made some significant changes on the
scope:
- Added `mapRecordValue` to map not only headers, but other record
metadata: topic name, partition, offset, and timestamp into a new type
`RecordValue<V>`.
- Added a serde for `RecordValue` to support stateful operations.
- Added `setRecordHeaders` to apply headers to record crossing the stream.
- Added headers to `To` to update headers via `context.forward(k, v, to)`.

New link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL

Looking forward to your feedback,

Cheers and stay safe,
Jorge.

On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> Thanks Sophie! Haven't followed KIP-478 but sounds great.
> I'll be happy to help on that migration to the new PAPI if it's still an
> open issue. We can bump this KIP after that.
>
> Cheers,
> Jorge.
>
> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
>
>> I *think* that the `To` Matthias was referring to was not KStream#to but
>> the To class
>> which is accepted as a possible parameter of ProcessorContext#forward
>> (correct
>> me if wrong).
>>
>> This was on the old ProcessorContext interface, which has now been
>> replaced with
>> the new api.ProcessorContext in KIP-478. In the new interface we've moved
>> away
>> from the forward signatures that accept a separate key or value or
>> timestamp or To,
>> and wrapped all of these into a single Record class. This new Record class
>> has the
>> headers as a field, so it seems like KIP-478 has happened to solve the
>> lack
>> of support
>> for Headers in the PAPI along the way.
>>
>> This is all somewhat recent, and probably wasn't yet sorted out at the
>> time
>> of Matthias'
>> last reply. But given how this worked out it seems like we can just focus
>> on adding
>> support for Headers in the DSL in this KIP by building off of the
>> groundwork of
>> KIP-478? It doesn't seem necessary to go back and add support for headers
>> in the old
>> PAPI, since this will (or already has?) been deprecated.
>>
>> The one challenge is that this will presumably require that we migrate all
>> DSL operators
>> to the new PAPI before adding header support for those operators. But that
>> definitely
>> sounds achievable here
>>
>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
>> quilcate.jorge@gmail.com> wrote:
>>
>> > Hi Matthias,
>> >
>> > Sorry for the late reply.
>> >
>> > I like the proposal. Just to check if I got it right:
>> >
>> > We can extend the `kstream.to()` function to support setting headers.
>> > e.g.:
>> >
>> > ```
>> >     void to(final String topic,
>> >             final Produced<K, V> produced,
>> >             final HeadersExtractor<K, V> headersExtractor);
>> > ```
>> >
>> > where `HeadersExtractor`:
>> >
>> > ```
>> > public interface HeadersExtractor<K, V> {
>> >     Headers extract(final K key, final V value, final RecordContext
>> > recordContext);
>> > }
>> > ```
>> >
>> >  This would require to change `Topology#addSink()` to support this
>> > extractor as well.
>> >
>> > If this is aligned with your proposal, I'm happy to add it to this KIP.
>> >
>> > Cheers,
>> > Jorge.
>> >
>> > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> > > Jorge,
>> > >
>> > > thanks a lot for this KIP. Being able to modify headers is a very
>> > > valuable feature.
>> > >
>> > > However, before we actually expose them in the DSL, I am wondering if
>> we
>> > > should improve how headers can be modified in the PAPI? Currently, it
>> is
>> > > possible but very clumsy to work with headers in the Processor API,
>> > > because of two reasons:
>> > >
>> > >  (1) There is no default `Headers` implementation in the public API
>> > >  (2) There is no explicit way to set headers for output records
>> > >
>> > > Currently, the input record headers are copied into the output records
>> > > when `forward()` is called, however, it's not really a deep copy but
>> we
>> > > just copy the reference. This implies that one needs to work with a
>> > > single mutable object that flows through multiple processors making it
>> > > very error prone.
>> > >
>> > > Furthermore, if you want to emit multiple output records, and for
>> > > example want to add two different headers to the output record (based
>> on
>> > > the same input headers), you would need to do something like this:
>> > >
>> > >   Headers h = context.headers();
>> > >   h.add(...);
>> > >   context.forward(...);
>> > >   // remove the header you added for the first output record
>> > >   h.remove(...);
>> > >   h.add(...);
>> > >   context.forward(...);
>> > >
>> > >
>> > > Maybe we could extend `To` to allow passing in a new `Headers` object
>> > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could
>> either
>> > > add it to your KIP or do a new KIP just for the PAPI.
>> > >
>> > > Thoughts?
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
>> > > > Hi everyone,
>> > > >
>> > > > Bumping this thread to check if there's any feedback.
>> > > >
>> > > > Cheers,
>> > > > Jorge.
>> > > >
>> > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
>> > > > quilcate.jorge@gmail.com> wrote:
>> > > >
>> > > >> Hi everyone,
>> > > >>
>> > > >> I would like to start the discussion for KIP-634:
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>> > > >>
>> > > >> Looking forward to your feedback.
>> > > >>
>> > > >> Thanks!
>> > > >> Jorge.
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >
>> > >
>> > >
>> >
>>
>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Thanks Sophie! Haven't followed KIP-478 but sounds great.
I'll be happy to help on that migration to the new PAPI if it's still an
open issue. We can bump this KIP after that.

Cheers,
Jorge.

On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> I *think* that the `To` Matthias was referring to was not KStream#to but
> the To class
> which is accepted as a possible parameter of ProcessorContext#forward
> (correct
> me if wrong).
>
> This was on the old ProcessorContext interface, which has now been
> replaced with
> the new api.ProcessorContext in KIP-478. In the new interface we've moved
> away
> from the forward signatures that accept a separate key or value or
> timestamp or To,
> and wrapped all of these into a single Record class. This new Record class
> has the
> headers as a field, so it seems like KIP-478 has happened to solve the lack
> of support
> for Headers in the PAPI along the way.
>
> This is all somewhat recent, and probably wasn't yet sorted out at the time
> of Matthias'
> last reply. But given how this worked out it seems like we can just focus
> on adding
> support for Headers in the DSL in this KIP by building off of the
> groundwork of
> KIP-478? It doesn't seem necessary to go back and add support for headers
> in the old
> PAPI, since this will (or already has?) been deprecated.
>
> The one challenge is that this will presumably require that we migrate all
> DSL operators
> to the new PAPI before adding header support for those operators. But that
> definitely
> sounds achievable here
>
> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
>
> > Hi Matthias,
> >
> > Sorry for the late reply.
> >
> > I like the proposal. Just to check if I got it right:
> >
> > We can extend the `kstream.to()` function to support setting headers.
> > e.g.:
> >
> > ```
> >     void to(final String topic,
> >             final Produced<K, V> produced,
> >             final HeadersExtractor<K, V> headersExtractor);
> > ```
> >
> > where `HeadersExtractor`:
> >
> > ```
> > public interface HeadersExtractor<K, V> {
> >     Headers extract(final K key, final V value, final RecordContext
> > recordContext);
> > }
> > ```
> >
> >  This would require to change `Topology#addSink()` to support this
> > extractor as well.
> >
> > If this is aligned with your proposal, I'm happy to add it to this KIP.
> >
> > Cheers,
> > Jorge.
> >
> > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org>
> wrote:
> >
> > > Jorge,
> > >
> > > thanks a lot for this KIP. Being able to modify headers is a very
> > > valuable feature.
> > >
> > > However, before we actually expose them in the DSL, I am wondering if
> we
> > > should improve how headers can be modified in the PAPI? Currently, it
> is
> > > possible but very clumsy to work with headers in the Processor API,
> > > because of two reasons:
> > >
> > >  (1) There is no default `Headers` implementation in the public API
> > >  (2) There is no explicit way to set headers for output records
> > >
> > > Currently, the input record headers are copied into the output records
> > > when `forward()` is called, however, it's not really a deep copy but we
> > > just copy the reference. This implies that one needs to work with a
> > > single mutable object that flows through multiple processors making it
> > > very error prone.
> > >
> > > Furthermore, if you want to emit multiple output records, and for
> > > example want to add two different headers to the output record (based
> on
> > > the same input headers), you would need to do something like this:
> > >
> > >   Headers h = context.headers();
> > >   h.add(...);
> > >   context.forward(...);
> > >   // remove the header you added for the first output record
> > >   h.remove(...);
> > >   h.add(...);
> > >   context.forward(...);
> > >
> > >
> > > Maybe we could extend `To` to allow passing in a new `Headers` object
> > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could either
> > > add it to your KIP or do a new KIP just for the PAPI.
> > >
> > > Thoughts?
> > >
> > >
> > > -Matthias
> > >
> > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > Hi everyone,
> > > >
> > > > Bumping this thread to check if there's any feedback.
> > > >
> > > > Cheers,
> > > > Jorge.
> > > >
> > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> > > > quilcate.jorge@gmail.com> wrote:
> > > >
> > > >> Hi everyone,
> > > >>
> > > >> I would like to start the discussion for KIP-634:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> > > >>
> > > >> Looking forward to your feedback.
> > > >>
> > > >> Thanks!
> > > >> Jorge.
> > > >>
> > > >>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by Sophie Blee-Goldman <so...@confluent.io>.
I *think* that the `To` Matthias was referring to was not KStream#to but
the To class
which is accepted as a possible parameter of ProcessorContext#forward
(correct
me if wrong).

This was on the old ProcessorContext interface, which has now been
replaced with
the new api.ProcessorContext in KIP-478. In the new interface we've moved
away
from the forward signatures that accept a separate key or value or
timestamp or To,
and wrapped all of these into a single Record class. This new Record class
has the
headers as a field, so it seems like KIP-478 has happened to solve the lack
of support
for Headers in the PAPI along the way.

This is all somewhat recent, and probably wasn't yet sorted out at the time
of Matthias'
last reply. But given how this worked out it seems like we can just focus
on adding
support for Headers in the DSL in this KIP by building off of the
groundwork of
KIP-478? It doesn't seem necessary to go back and add support for headers
in the old
PAPI, since this will (or already has?) been deprecated.

The one challenge is that this will presumably require that we migrate all
DSL operators
to the new PAPI before adding header support for those operators. But that
definitely
sounds achievable here

On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> Hi Matthias,
>
> Sorry for the late reply.
>
> I like the proposal. Just to check if I got it right:
>
> We can extend the `kstream.to()` function to support setting headers.
> e.g.:
>
> ```
>     void to(final String topic,
>             final Produced<K, V> produced,
>             final HeadersExtractor<K, V> headersExtractor);
> ```
>
> where `HeadersExtractor`:
>
> ```
> public interface HeadersExtractor<K, V> {
>     Headers extract(final K key, final V value, final RecordContext
> recordContext);
> }
> ```
>
>  This would require to change `Topology#addSink()` to support this
> extractor as well.
>
> If this is aligned with your proposal, I'm happy to add it to this KIP.
>
> Cheers,
> Jorge.
>
> On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Jorge,
> >
> > thanks a lot for this KIP. Being able to modify headers is a very
> > valuable feature.
> >
> > However, before we actually expose them in the DSL, I am wondering if we
> > should improve how headers can be modified in the PAPI? Currently, it is
> > possible but very clumsy to work with headers in the Processor API,
> > because of two reasons:
> >
> >  (1) There is no default `Headers` implementation in the public API
> >  (2) There is no explicit way to set headers for output records
> >
> > Currently, the input record headers are copied into the output records
> > when `forward()` is called, however, it's not really a deep copy but we
> > just copy the reference. This implies that one needs to work with a
> > single mutable object that flows through multiple processors making it
> > very error prone.
> >
> > Furthermore, if you want to emit multiple output records, and for
> > example want to add two different headers to the output record (based on
> > the same input headers), you would need to do something like this:
> >
> >   Headers h = context.headers();
> >   h.add(...);
> >   context.forward(...);
> >   // remove the header you added for the first output record
> >   h.remove(...);
> >   h.add(...);
> >   context.forward(...);
> >
> >
> > Maybe we could extend `To` to allow passing in a new `Headers` object
> > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could either
> > add it to your KIP or do a new KIP just for the PAPI.
> >
> > Thoughts?
> >
> >
> > -Matthias
> >
> > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > > Hi everyone,
> > >
> > > Bumping this thread to check if there's any feedback.
> > >
> > > Cheers,
> > > Jorge.
> > >
> > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> > > quilcate.jorge@gmail.com> wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> I would like to start the discussion for KIP-634:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> > >>
> > >> Looking forward to your feedback.
> > >>
> > >> Thanks!
> > >> Jorge.
> > >>
> > >>
> > >>
> > >>
> > >
> >
> >
>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Hi Matthias,

Sorry for the late reply.

I like the proposal. Just to check if I got it right:

We can extend the `kstream.to()` function to support setting headers. e.g.:

```
    void to(final String topic,
            final Produced<K, V> produced,
            final HeadersExtractor<K, V> headersExtractor);
```

where `HeadersExtractor`:

```
public interface HeadersExtractor<K, V> {
    Headers extract(final K key, final V value, final RecordContext
recordContext);
}
```

 This would require to change `Topology#addSink()` to support this
extractor as well.

If this is aligned with your proposal, I'm happy to add it to this KIP.

Cheers,
Jorge.

On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org> wrote:

> Jorge,
>
> thanks a lot for this KIP. Being able to modify headers is a very
> valuable feature.
>
> However, before we actually expose them in the DSL, I am wondering if we
> should improve how headers can be modified in the PAPI? Currently, it is
> possible but very clumsy to work with headers in the Processor API,
> because of two reasons:
>
>  (1) There is no default `Headers` implementation in the public API
>  (2) There is no explicit way to set headers for output records
>
> Currently, the input record headers are copied into the output records
> when `forward()` is called, however, it's not really a deep copy but we
> just copy the reference. This implies that one needs to work with a
> single mutable object that flows through multiple processors making it
> very error prone.
>
> Furthermore, if you want to emit multiple output records, and for
> example want to add two different headers to the output record (based on
> the same input headers), you would need to do something like this:
>
>   Headers h = context.headers();
>   h.add(...);
>   context.forward(...);
>   // remove the header you added for the first output record
>   h.remove(...);
>   h.add(...);
>   context.forward(...);
>
>
> Maybe we could extend `To` to allow passing in a new `Headers` object
> (or an `Iterable<Header>` similar to `ProducerRecord`)? We could either
> add it to your KIP or do a new KIP just for the PAPI.
>
> Thoughts?
>
>
> -Matthias
>
> On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > Hi everyone,
> >
> > Bumping this thread to check if there's any feedback.
> >
> > Cheers,
> > Jorge.
> >
> > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> > quilcate.jorge@gmail.com> wrote:
> >
> >> Hi everyone,
> >>
> >> I would like to start the discussion for KIP-634:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> >>
> >> Looking forward to your feedback.
> >>
> >> Thanks!
> >> Jorge.
> >>
> >>
> >>
> >>
> >
>
>

Re: [DISCUSS] KIP-634: Complementary support for headers in Kafka Streams DSL

Posted by "Matthias J. Sax" <mj...@apache.org>.
Jorge,

thanks a lot for this KIP. Being able to modify headers is a very
valuable feature.

However, before we actually expose them in the DSL, I am wondering if we
should improve how headers can be modified in the PAPI? Currently, it is
possible but very clumsy to work with headers in the Processor API,
because of two reasons:

 (1) There is no default `Headers` implementation in the public API
 (2) There is no explicit way to set headers for output records

Currently, the input record headers are copied into the output records
when `forward()` is called, however, it's not really a deep copy but we
just copy the reference. This implies that one needs to work with a
single mutable object that flows through multiple processors making it
very error prone.

Furthermore, if you want to emit multiple output records, and for
example want to add two different headers to the output record (based on
the same input headers), you would need to do something like this:

  Headers h = context.headers();
  h.add(...);
  context.forward(...);
  // remove the header you added for the first output record
  h.remove(...);
  h.add(...);
  context.forward(...);


Maybe we could extend `To` to allow passing in a new `Headers` object
(or an `Iterable<Header>` similar to `ProducerRecord`)? We could either
add it to your KIP or do a new KIP just for the PAPI.

Thoughts?


-Matthias

On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> Hi everyone,
> 
> Bumping this thread to check if there's any feedback.
> 
> Cheers,
> Jorge.
> 
> On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
> 
>> Hi everyone,
>>
>> I would like to start the discussion for KIP-634:https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>>
>> Looking forward to your feedback.
>>
>> Thanks!
>> Jorge.
>>
>>
>>
>>
>