You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Allen Wang (JIRA)" <ji...@apache.org> on 2019/01/14 20:48:00 UTC
[jira] [Updated] (FLINK-11303) Utilizing Kafka headers for
serialization and deserialization
[ https://issues.apache.org/jira/browse/FLINK-11303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Allen Wang updated FLINK-11303:
-------------------------------
Description:
Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]
However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.
I propose to support headers in Flink by modifying the following API:
* In KeyedSerializationSchema, add
{code:java}
List<Tuple2<String, byte[]>> getHeaders(T element)
{code}
* In KeyedDeserializationSchema, add
{code:java}
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code}
These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.
was:
Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]
However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.
I propose to support headers in Flink by modifying the following API:
* In KeyedSerializationSchema, add
{code:java}
List<Tuple2<String, byte[]>> getHeaders(T element)
{code}
* In KeyedDeserializationSchema, add
{code:java}
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code}
These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.
If backward compatiblity
> Utilizing Kafka headers for serialization and deserialization
> -------------------------------------------------------------
>
> Key: FLINK-11303
> URL: https://issues.apache.org/jira/browse/FLINK-11303
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Allen Wang
> Priority: Major
>
> Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]
> However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.
>
> I propose to support headers in Flink by modifying the following API:
> * In KeyedSerializationSchema, add
> {code:java}
> List<Tuple2<String, byte[]>> getHeaders(T element)
> {code}
> * In KeyedDeserializationSchema, add
> {code:java}
> T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code}
>
> These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)