You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Allen Wang (JIRA)" <ji...@apache.org> on 2019/01/14 20:48:00 UTC

[jira] [Updated] (FLINK-11303) Utilizing Kafka headers for serialization and deserialization

     [ https://issues.apache.org/jira/browse/FLINK-11303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Allen Wang updated FLINK-11303:
-------------------------------
    Description: 
Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]

However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.

 

I propose to support headers in Flink by modifying the following API:
 * In KeyedSerializationSchema, add
{code:java}
List<Tuple2<String, byte[]>> getHeaders(T element)
{code}

 * In KeyedDeserializationSchema, add
{code:java}
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code}

 

These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.

 

  was:
Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]

However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.

 

I propose to support headers in Flink by modifying the following API:
 * In KeyedSerializationSchema, add
{code:java}
List<Tuple2<String, byte[]>> getHeaders(T element)
{code}

 * In KeyedDeserializationSchema, add
{code:java}
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code}

 

These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.

 

If backward compatiblity 


> Utilizing Kafka headers for serialization and deserialization
> -------------------------------------------------------------
>
>                 Key: FLINK-11303
>                 URL: https://issues.apache.org/jira/browse/FLINK-11303
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Allen Wang
>            Priority: Major
>
> Kafka introduces headers in producer and consumer record since version 0.11. This is the high level description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]
> However, current Flink Kafka connector simply ignores the headers. This will make it hard to integrate with the Kafka ecosystem where other Kafka clients make use of the headers.
>  
> I propose to support headers in Flink by modifying the following API:
>  * In KeyedSerializationSchema, add
> {code:java}
> List<Tuple2<String, byte[]>> getHeaders(T element)
> {code}
>  * In KeyedDeserializationSchema, add
> {code:java}
> T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>> headers, String topic, int partition, long offset) throws IOException{code}
>  
> These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the serialization and deserialization process. If backward compatibility is a concern, we can add default implementation to these methods where headers are ignored.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)