You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by prateek arora <pr...@gmail.com> on 2016/04/22 00:02:44 UTC

How to fetch kafka Message have [KEY,VALUE] pair

Hi

I am new for Apache Flink and start  using Flink version 1.0.1

In my scenario,   kafka message have key value pair [String,Array[Byte]] .

I tried to use FlinkKafkaConsumer08 to fetch data but i dont know how to
write  DeserializationSchema for that.

val stream : DataStream[(String,Array[Byte])]  = env.addSource(new
FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",<DeserializationSchema> ,
properties))

please help me to solve this problem .

Regards
Prateek

Re: How to fetch kafka Message have [KEY,VALUE] pair

Posted by Robert Metzger <rm...@apache.org>.
If you've serialized your data with a custom format, you can also implement
a custom deserializer using the KeyedDeserializationSchema.

On Fri, Apr 22, 2016 at 2:35 PM, Till Rohrmann <tr...@apache.org> wrote:

> Depending on how the key value pair is encoded, you could use the
> TypeInformationKeyValueSerializationSchema where you provide the
> BasicTypeInfo.STRING_TYPE_INFO and
> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO as the key and
> value type information. But this only works if your data was serialized in
> a similar fashion.
>
> Cheers,
> Till
> ​
>
> On Fri, Apr 22, 2016 at 12:02 AM, prateek arora <
> prateek.arora2k6@gmail.com> wrote:
>
>> Hi
>>
>> I am new for Apache Flink and start  using Flink version 1.0.1
>>
>> In my scenario,   kafka message have key value pair [String,Array[Byte]]
>> .
>>
>> I tried to use FlinkKafkaConsumer08 to fetch data but i dont know how to
>> write  DeserializationSchema for that.
>>
>> val stream : DataStream[(String,Array[Byte])]  = env.addSource(new
>> FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",<DeserializationSchema> ,
>> properties))
>>
>> please help me to solve this problem .
>>
>> Regards
>> Prateek
>>
>
>

Re: How to fetch kafka Message have [KEY,VALUE] pair

Posted by Till Rohrmann <tr...@apache.org>.
Depending on how the key value pair is encoded, you could use the
TypeInformationKeyValueSerializationSchema where you provide the
BasicTypeInfo.STRING_TYPE_INFO and
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO as the key and value
type information. But this only works if your data was serialized in a
similar fashion.

Cheers,
Till
​

On Fri, Apr 22, 2016 at 12:02 AM, prateek arora <pr...@gmail.com>
wrote:

> Hi
>
> I am new for Apache Flink and start  using Flink version 1.0.1
>
> In my scenario,   kafka message have key value pair [String,Array[Byte]] .
>
> I tried to use FlinkKafkaConsumer08 to fetch data but i dont know how to
> write  DeserializationSchema for that.
>
> val stream : DataStream[(String,Array[Byte])]  = env.addSource(new
> FlinkKafkaConsumer08[(String,Array[Byte])]("a-0",<DeserializationSchema> ,
> properties))
>
> please help me to solve this problem .
>
> Regards
> Prateek
>