You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Hao Sun <ha...@zendesk.com> on 2018/12/18 19:22:22 UTC

Kafka consumer, is there a way to filter out messages using key only?

Hi, I am using 1.7 on K8S.

I have a huge amount of data in kafka, but I only need a tiny portion of it.
It is a keyed stream, the value in JSON encoded. I want to avoid
deserialization of the value, since it is very expensive. Can I only filter
based on the key?
I know there is a KeyedDeserializationSchema, but can I use it to filter
data?

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103

Re: Kafka consumer, is there a way to filter out messages using key only?

Posted by Hao Sun <ha...@zendesk.com>.
Cool, thanks! I used the option value approach, worked well.

On Thu, Dec 27, 2018, 03:49 Dominik Wosiński <wo...@gmail.com> wrote:

> Hey,
> AFAIK, returning null from deserialize function in FlinkKafkaConsumer will
> indeed filter the message out and it won't be further processed.
>
> Best Regards,
> Dom.
>
> śr., 19 gru 2018 o 11:06 Dawid Wysakowicz <dw...@apache.org>
> napisał(a):
>
>> Hi,
>>
>> I'm afraid that there is no out-of-the box solution for this, but what
>> you could do is to generate from KeyedDeserializationSchema some marker
>> (Optional, null value...) based on the message key, that would allow you
>> later to filter it out. So assuming the Optional solution the result of
>> KeyedDeserializationSchema#deserialize could be Optional.empty() for
>> invalid keys and Optional.of(deserializedValue) for valid keys.
>>
>> Best,
>>
>> Dawid
>> On 18/12/2018 20:22, Hao Sun wrote:
>>
>> Hi, I am using 1.7 on K8S.
>>
>> I have a huge amount of data in kafka, but I only need a tiny portion of
>> it.
>> It is a keyed stream, the value in JSON encoded. I want to avoid
>> deserialization of the value, since it is very expensive. Can I only filter
>> based on the key?
>> I know there is a KeyedDeserializationSchema, but can I use it to filter
>> data?
>>
>> Hao Sun
>> Team Lead
>> 1019 Market St. 7F
>> <https://maps.google.com/?q=1019+Market+St.+7F+%0D%0A+++++++++++++++++++++++San+Francisco,+CA+94103&entry=gmail&source=g>
>>
>> <https://maps.google.com/?q=1019+Market+St.+7F+%0D%0A+++++++++++++++++++++++San+Francisco,+CA+94103&entry=gmail&source=g>
>> San Francisco, CA 94103
>> <https://maps.google.com/?q=1019+Market+St.+7F+%0D%0A+++++++++++++++++++++++San+Francisco,+CA+94103&entry=gmail&source=g>
>>
>>

Re: Kafka consumer, is there a way to filter out messages using key only?

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey,
AFAIK, returning null from deserialize function in FlinkKafkaConsumer will
indeed filter the message out and it won't be further processed.

Best Regards,
Dom.

śr., 19 gru 2018 o 11:06 Dawid Wysakowicz <dw...@apache.org>
napisał(a):

> Hi,
>
> I'm afraid that there is no out-of-the box solution for this, but what you
> could do is to generate from KeyedDeserializationSchema some marker
> (Optional, null value...) based on the message key, that would allow you
> later to filter it out. So assuming the Optional solution the result of
> KeyedDeserializationSchema#deserialize could be Optional.empty() for
> invalid keys and Optional.of(deserializedValue) for valid keys.
>
> Best,
>
> Dawid
> On 18/12/2018 20:22, Hao Sun wrote:
>
> Hi, I am using 1.7 on K8S.
>
> I have a huge amount of data in kafka, but I only need a tiny portion of
> it.
> It is a keyed stream, the value in JSON encoded. I want to avoid
> deserialization of the value, since it is very expensive. Can I only filter
> based on the key?
> I know there is a KeyedDeserializationSchema, but can I use it to filter
> data?
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103
>
>

Re: Kafka consumer, is there a way to filter out messages using key only?

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

I'm afraid that there is no out-of-the box solution for this, but what
you could do is to generate from KeyedDeserializationSchema some marker
(Optional, null value...) based on the message key, that would allow you
later to filter it out. So assuming the Optional solution the result of
KeyedDeserializationSchema#deserialize could be Optional.empty() for
invalid keys and Optional.of(deserializedValue) for valid keys.

Best,

Dawid

On 18/12/2018 20:22, Hao Sun wrote:
> Hi, I am using 1.7 on K8S.
>
> I have a huge amount of data in kafka, but I only need a tiny portion
> of it.
> It is a keyed stream, the value in JSON encoded. I want to avoid
> deserialization of the value, since it is very expensive. Can I only
> filter based on the key?
> I know there is a KeyedDeserializationSchema, but can I use it to
> filter data?
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103