You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Great Info <gu...@gmail.com> on 2022/10/12 18:03:47 UTC

Utilizing Kafka headers in Flink Kafka connector

I have some flink applications that read streams from Kafka, now
the producer side code has introduced some additional information in Kafka
headers while producing records.
Now I need to change my consumer-side logic to process the records if the
header contains a specific value, if the header value is different than the
one I am looking I just need to move forward with the next steam.

I got some sample reference code
<https://issues.apache.org/jira/browse/KAFKA-4208>but this logic needs to
deserialize and verify the header. Is there any simple way to ignore the
record before deserializing?

Re: Utilizing Kafka headers in Flink Kafka connector

Posted by Shengkai Fang <fs...@gmail.com>.
hi.

You can use SQL API to parse or write the header in the Kafka record[1] if
you are using Flink SQL.

Best,
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata

Yaroslav Tkachenko <ya...@goldsky.com> 于2022年10月13日周四 02:21写道:

> Hi,
>
> You can implement a custom KafkaRecordDeserializationSchema (example
> https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer)
> and just avoid emitting the record if the header value matches what you
> need.
>
> On Wed, Oct 12, 2022 at 11:04 AM Great Info <gu...@gmail.com> wrote:
>
>> I have some flink applications that read streams from Kafka, now
>> the producer side code has introduced some additional information in Kafka
>> headers while producing records.
>> Now I need to change my consumer-side logic to process the records if the
>> header contains a specific value, if the header value is different than the
>> one I am looking I just need to move forward with the next steam.
>>
>> I got some sample reference code
>> <https://issues.apache.org/jira/browse/KAFKA-4208>but this logic needs
>> to deserialize and verify the header. Is there any simple way to ignore the
>> record before deserializing?
>>
>

Re: Utilizing Kafka headers in Flink Kafka connector

Posted by Shengkai Fang <fs...@gmail.com>.
hi.

You can use SQL API to parse or write the header in the Kafka record[1] if
you are using Flink SQL.

Best,
Shengkai

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#available-metadata

Yaroslav Tkachenko <ya...@goldsky.com> 于2022年10月13日周四 02:21写道:

> Hi,
>
> You can implement a custom KafkaRecordDeserializationSchema (example
> https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer)
> and just avoid emitting the record if the header value matches what you
> need.
>
> On Wed, Oct 12, 2022 at 11:04 AM Great Info <gu...@gmail.com> wrote:
>
>> I have some flink applications that read streams from Kafka, now
>> the producer side code has introduced some additional information in Kafka
>> headers while producing records.
>> Now I need to change my consumer-side logic to process the records if the
>> header contains a specific value, if the header value is different than the
>> one I am looking I just need to move forward with the next steam.
>>
>> I got some sample reference code
>> <https://issues.apache.org/jira/browse/KAFKA-4208>but this logic needs
>> to deserialize and verify the header. Is there any simple way to ignore the
>> record before deserializing?
>>
>

Re: Utilizing Kafka headers in Flink Kafka connector

Posted by Yaroslav Tkachenko <ya...@goldsky.com>.
Hi,

You can implement a custom KafkaRecordDeserializationSchema (example
https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer)
and just avoid emitting the record if the header value matches what you
need.

On Wed, Oct 12, 2022 at 11:04 AM Great Info <gu...@gmail.com> wrote:

> I have some flink applications that read streams from Kafka, now
> the producer side code has introduced some additional information in Kafka
> headers while producing records.
> Now I need to change my consumer-side logic to process the records if the
> header contains a specific value, if the header value is different than the
> one I am looking I just need to move forward with the next steam.
>
> I got some sample reference code
> <https://issues.apache.org/jira/browse/KAFKA-4208>but this logic needs to
> deserialize and verify the header. Is there any simple way to ignore the
> record before deserializing?
>

Re: Utilizing Kafka headers in Flink Kafka connector

Posted by Yaroslav Tkachenko <ya...@goldsky.com.INVALID>.
Hi,

You can implement a custom KafkaRecordDeserializationSchema (example
https://docs.immerok.cloud/docs/cookbook/reading-apache-kafka-headers-with-apache-flink/#the-custom-deserializer)
and just avoid emitting the record if the header value matches what you
need.

On Wed, Oct 12, 2022 at 11:04 AM Great Info <gu...@gmail.com> wrote:

> I have some flink applications that read streams from Kafka, now
> the producer side code has introduced some additional information in Kafka
> headers while producing records.
> Now I need to change my consumer-side logic to process the records if the
> header contains a specific value, if the header value is different than the
> one I am looking I just need to move forward with the next steam.
>
> I got some sample reference code
> <https://issues.apache.org/jira/browse/KAFKA-4208>but this logic needs to
> deserialize and verify the header. Is there any simple way to ignore the
> record before deserializing?
>