You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Tzu-Li (Gordon) Tai" <tz...@apache.org> on 2017/03/07 06:08:00 UTC

Re: FlinkKafkaConsumer010 - creating a data stream of type DataStream>

Hi Dominik,

I would recommend implementing a `KeyedSerializationSchema`, and supply it to the constructor when initializing your FlinkKafkaConsumer.

The `KeyedDeserializationSchema` exposes the metadata of the record such as offset, partition, and key. In the schema, you can implement your own logic of turning the binary data from Kafka into your own data types that carry the metadata information along with the record value, e.g. POJOs or Tuples.

Some links for more info on this:
1. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#the-deserializationschema
2. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#flinks-typeinformation-class

The metadata `KeyedDeserializationSchema` exposes is extracted from `ConsumerRecord`s within the Kafka connector, so it doesn’t make sense to wrap it up again into a `ConsumerRecord`. The schema interface exposes all available metadata of the record, so it should be sufficient.

Cheers,
Gordon

On March 7, 2017 at 3:51:59 AM, Dominik Safaric (dominiksafaric@gmail.com) wrote:

Hi,  

Unfortunately I cannot find the option of using raw ConsumerRecord<K,V> instances when creating a Kafka data stream.  

In general, I would like to use an instance of the mentioned type because our use case requires certain metadata such as record offset and partition.  

So far I’ve examined the source code of the Kafka connector and checked the docs, but unfortunately I could not find the option of creating a data stream of the type DataStream<ConsumerRecord<K,V>>.  

Am I missing something or in order to have this ability I have to implement it myself and build Flink from source?  

Thanks in advance,  
Dominik

Re: FlinkKafkaConsumer010 - creating a data stream of type DataStream>

Posted by Dominik Safaric <do...@gmail.com>.
Hi Gordon,

Thanks for the advice. Following it I’ve implemented the Keyed(De)SerializationSchema and am able to further emit the metadata to downstream operators. 

Regards,
Dominik

> On 7 Mar 2017, at 07:08, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
> 
> Hi Dominik,
> 
> I would recommend implementing a `KeyedSerializationSchema`, and supply it to the constructor when initializing your FlinkKafkaConsumer.
> 
> The `KeyedDeserializationSchema` exposes the metadata of the record such as offset, partition, and key. In the schema, you can implement your own logic of turning the binary data from Kafka into your own data types that carry the metadata information along with the record value, e.g. POJOs or Tuples.
> 
> Some links for more info on this:
> 1. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#the-deserializationschema <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kafka.html#the-deserializationschema>
> 2. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#flinks-typeinformation-class <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#flinks-typeinformation-class>
> 
> The metadata `KeyedDeserializationSchema` exposes is extracted from `ConsumerRecord`s within the Kafka connector, so it doesn’t make sense to wrap it up again into a `ConsumerRecord`. The schema interface exposes all available metadata of the record, so it should be sufficient.
> 
> Cheers,
> Gordon
> 
> On March 7, 2017 at 3:51:59 AM, Dominik Safaric (dominiksafaric@gmail.com <ma...@gmail.com>) wrote:
> 
>> Hi, 
>> 
>> Unfortunately I cannot find the option of using raw ConsumerRecord<K,V> instances when creating a Kafka data stream.  
>> 
>> In general, I would like to use an instance of the mentioned type because our use case requires certain metadata such as record offset and partition. 
>> 
>> So far I’ve examined the source code of the Kafka connector and checked the docs, but unfortunately I could not find the option of creating a data stream of the type DataStream<ConsumerRecord<K,V>>.  
>> 
>> Am I missing something or in order to have this ability I have to implement it myself and build Flink from source?  
>> 
>> Thanks in advance, 
>> Dominik