You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Alex Gout <al...@shopify.com.INVALID> on 2023/02/13 15:08:24 UTC

Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

Hi all,

I'm currently working on a few pipelines sinking to Kafka. The downstream
consumers of the sink topics expect some Kafka headers to be set. However
the default org.apache.flink.connector.kafka.sink.KafkaSink does
not support adding Kafka record headers.

I tracked the code path down to
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
where the RecordProducer is created.
It is relatively simple to add support for record headers by adding a
"HeaderProducer" next to the key and value serializers and using the
appropriate RecordProducer constructor.

For the benefit of my own projects, I have implemented this header support
and would be eager to share my implementation as a proposal if there's a
consensus this would indeed be a valuable addition.

Please let me know what you think.
Thanks,
- Alex

Re: Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

Posted by Jing Ge <ji...@ververica.com.INVALID>.
Hi Alex,

Just assigned the ticket to you. Kafka Header should be supported as we
started implementing the KafkaSink. Thanks for driving this!

Best regards,
Jing

On Fri, Feb 17, 2023 at 3:35 PM Alex Gout <al...@shopify.com.invalid>
wrote:

> Hey :)
> I created a JIRA <https://issues.apache.org/jira/browse/FLINK-31049> for
> it. Can someone assign it to me?
>
> On Mon, Feb 13, 2023 at 3:17 PM Márton Balassi <ba...@gmail.com>
> wrote:
>
> > Hi Alex,
> >
> > Please do share, this comes up somewhat frequently.
> >
> > Marton
> >
> > On Mon, Feb 13, 2023 at 7:44 PM Őrhidi Mátyás <ma...@gmail.com>
> > wrote:
> >
> > > Hi Alex,
> > >
> > > This is a reasonable request IMO. I've recently bumped into this topic
> > > myself. This could be handy for supporting schema registries in Kafka
> to
> > > Kafka scenarios for example. Looking forward to your proposal.
> > >
> > > Cheers,
> > > Matyas
> > >
> > > On Mon, Feb 13, 2023 at 7:08 AM Alex Gout
> <alex.gout@shopify.com.invalid
> > >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'm currently working on a few pipelines sinking to Kafka. The
> > downstream
> > > > consumers of the sink topics expect some Kafka headers to be set.
> > However
> > > > the default org.apache.flink.connector.kafka.sink.KafkaSink does
> > > > not support adding Kafka record headers.
> > > >
> > > > I tracked the code path down to
> > > >
> > >
> >
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> > > > where the RecordProducer is created.
> > > > It is relatively simple to add support for record headers by adding a
> > > > "HeaderProducer" next to the key and value serializers and using the
> > > > appropriate RecordProducer constructor.
> > > >
> > > > For the benefit of my own projects, I have implemented this header
> > > support
> > > > and would be eager to share my implementation as a proposal if
> there's
> > a
> > > > consensus this would indeed be a valuable addition.
> > > >
> > > > Please let me know what you think.
> > > > Thanks,
> > > > - Alex
> > > >
> > >
> >
>

Re: Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

Posted by Alex Gout <al...@shopify.com.INVALID>.
Hey :)
I created a JIRA <https://issues.apache.org/jira/browse/FLINK-31049> for
it. Can someone assign it to me?

On Mon, Feb 13, 2023 at 3:17 PM Márton Balassi <ba...@gmail.com>
wrote:

> Hi Alex,
>
> Please do share, this comes up somewhat frequently.
>
> Marton
>
> On Mon, Feb 13, 2023 at 7:44 PM Őrhidi Mátyás <ma...@gmail.com>
> wrote:
>
> > Hi Alex,
> >
> > This is a reasonable request IMO. I've recently bumped into this topic
> > myself. This could be handy for supporting schema registries in Kafka to
> > Kafka scenarios for example. Looking forward to your proposal.
> >
> > Cheers,
> > Matyas
> >
> > On Mon, Feb 13, 2023 at 7:08 AM Alex Gout <alex.gout@shopify.com.invalid
> >
> > wrote:
> >
> > > Hi all,
> > >
> > > I'm currently working on a few pipelines sinking to Kafka. The
> downstream
> > > consumers of the sink topics expect some Kafka headers to be set.
> However
> > > the default org.apache.flink.connector.kafka.sink.KafkaSink does
> > > not support adding Kafka record headers.
> > >
> > > I tracked the code path down to
> > >
> >
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> > > where the RecordProducer is created.
> > > It is relatively simple to add support for record headers by adding a
> > > "HeaderProducer" next to the key and value serializers and using the
> > > appropriate RecordProducer constructor.
> > >
> > > For the benefit of my own projects, I have implemented this header
> > support
> > > and would be eager to share my implementation as a proposal if there's
> a
> > > consensus this would indeed be a valuable addition.
> > >
> > > Please let me know what you think.
> > > Thanks,
> > > - Alex
> > >
> >
>

Re: Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

Posted by Márton Balassi <ba...@gmail.com>.
Hi Alex,

Please do share, this comes up somewhat frequently.

Marton

On Mon, Feb 13, 2023 at 7:44 PM Őrhidi Mátyás <ma...@gmail.com>
wrote:

> Hi Alex,
>
> This is a reasonable request IMO. I've recently bumped into this topic
> myself. This could be handy for supporting schema registries in Kafka to
> Kafka scenarios for example. Looking forward to your proposal.
>
> Cheers,
> Matyas
>
> On Mon, Feb 13, 2023 at 7:08 AM Alex Gout <al...@shopify.com.invalid>
> wrote:
>
> > Hi all,
> >
> > I'm currently working on a few pipelines sinking to Kafka. The downstream
> > consumers of the sink topics expect some Kafka headers to be set. However
> > the default org.apache.flink.connector.kafka.sink.KafkaSink does
> > not support adding Kafka record headers.
> >
> > I tracked the code path down to
> >
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> > where the RecordProducer is created.
> > It is relatively simple to add support for record headers by adding a
> > "HeaderProducer" next to the key and value serializers and using the
> > appropriate RecordProducer constructor.
> >
> > For the benefit of my own projects, I have implemented this header
> support
> > and would be eager to share my implementation as a proposal if there's a
> > consensus this would indeed be a valuable addition.
> >
> > Please let me know what you think.
> > Thanks,
> > - Alex
> >
>

Re: Proposal to add support for Kafka Headers to KafkaRecordSerializationSchemaWrapper

Posted by Őrhidi Mátyás <ma...@gmail.com>.
Hi Alex,

This is a reasonable request IMO. I've recently bumped into this topic
myself. This could be handy for supporting schema registries in Kafka to
Kafka scenarios for example. Looking forward to your proposal.

Cheers,
Matyas

On Mon, Feb 13, 2023 at 7:08 AM Alex Gout <al...@shopify.com.invalid>
wrote:

> Hi all,
>
> I'm currently working on a few pipelines sinking to Kafka. The downstream
> consumers of the sink topics expect some Kafka headers to be set. However
> the default org.apache.flink.connector.kafka.sink.KafkaSink does
> not support adding Kafka record headers.
>
> I tracked the code path down to
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
> where the RecordProducer is created.
> It is relatively simple to add support for record headers by adding a
> "HeaderProducer" next to the key and value serializers and using the
> appropriate RecordProducer constructor.
>
> For the benefit of my own projects, I have implemented this header support
> and would be eager to share my implementation as a proposal if there's a
> consensus this would indeed be a valuable addition.
>
> Please let me know what you think.
> Thanks,
> - Alex
>