You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by hemant singh <he...@gmail.com> on 2020/03/18 10:48:20 UTC

Flink Schema Validation - Kafka

Hi Users,

Is there a way I can do a schema validation on read from Kafka in a Flink
job.

I have a pipeline like this

Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic
Transformed(avro data) -> Sink

While reading from Raw topic I wanted to validate the schema so that in
case the schema check fails I can push the event to an error topic. I
understand from documentation[1] that the events which cannot be
deserialised will be returned as null and consumer moves ahead(failing the
consumer does not help as this could be re-tried with same result).
Is there a way I can filter these records if the events cannot be
deserialised .

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
'When encountering a corrupted message that cannot be deserialised for any
reason, there are two options - either throwing an exception from the
deserialize(...) method which will cause the job to fail and be restarted,
or returning null to allow the Flink Kafka consumer to silently skip the
corrupted message.'

Thanks,
Hemant

Re: Flink Schema Validation - Kafka

Posted by hemant singh <he...@gmail.com>.
Hi Robert,

Thanks for your reply. This helps, was looking into similar direction.

Thanks,
Hemant

On Wed, 18 Mar 2020 at 8:44 PM, Robert Metzger <rm...@apache.org> wrote:

> Hi Hemant,
>
> you could let the Kafka consumer just deserialize your JSON data as into a
> DataStream<String>, then you use a custom processFunction to parse the
> string to JSON.
> In your custom function, you can handle the error more flexibly (like
> outputting erroneous records through a side output).
>
> I hope this helps!
>
> Best,
> Robert
>
> On Wed, Mar 18, 2020 at 11:48 AM hemant singh <he...@gmail.com>
> wrote:
>
>> Hi Users,
>>
>> Is there a way I can do a schema validation on read from Kafka in a Flink
>> job.
>>
>> I have a pipeline like this
>>
>> Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic
>> Transformed(avro data) -> Sink
>>
>> While reading from Raw topic I wanted to validate the schema so that in
>> case the schema check fails I can push the event to an error topic. I
>> understand from documentation[1] that the events which cannot be
>> deserialised will be returned as null and consumer moves ahead(failing the
>> consumer does not help as this could be re-tried with same result).
>> Is there a way I can filter these records if the events cannot be
>> deserialised .
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
>> 'When encountering a corrupted message that cannot be deserialised for
>> any reason, there are two options - either throwing an exception from the
>> deserialize(...) method which will cause the job to fail and be
>> restarted, or returning null to allow the Flink Kafka consumer to
>> silently skip the corrupted message.'
>>
>> Thanks,
>> Hemant
>>
>

Re: Flink Schema Validation - Kafka

Posted by Robert Metzger <rm...@apache.org>.
Hi Hemant,

you could let the Kafka consumer just deserialize your JSON data as into a
DataStream<String>, then you use a custom processFunction to parse the
string to JSON.
In your custom function, you can handle the error more flexibly (like
outputting erroneous records through a side output).

I hope this helps!

Best,
Robert

On Wed, Mar 18, 2020 at 11:48 AM hemant singh <he...@gmail.com> wrote:

> Hi Users,
>
> Is there a way I can do a schema validation on read from Kafka in a Flink
> job.
>
> I have a pipeline like this
>
> Kafka Topic Raw(json data) -> Kafka Topic Avro(avro data) -> Kafka Topic
> Transformed(avro data) -> Sink
>
> While reading from Raw topic I wanted to validate the schema so that in
> case the schema check fails I can push the event to an error topic. I
> understand from documentation[1] that the events which cannot be
> deserialised will be returned as null and consumer moves ahead(failing the
> consumer does not help as this could be re-tried with same result).
> Is there a way I can filter these records if the events cannot be
> deserialised .
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
> 'When encountering a corrupted message that cannot be deserialised for
> any reason, there are two options - either throwing an exception from the
> deserialize(...) method which will cause the job to fail and be
> restarted, or returning null to allow the Flink Kafka consumer to
> silently skip the corrupted message.'
>
> Thanks,
> Hemant
>