You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Smith <ja...@gmail.com> on 2019/10/11 21:35:28 UTC

Discard message on deserialization errors.

Hi using Flink 1.8.0.

I am ingesting data from Kafka, unfortunately for the time being I have not
looked into using the schema registry.

So for now I would like to write a simple deserialization schema that
discards the data if deserialization fails.

The other option is to do in flat map with markers and split to dead letter
queue, but I'm not too concerned about that for now.

Is it ok to just return null if deserialization fails?

@Override
public MyObject deserialize(byte[] message) {
   try {
      return MyDecoder.decode(message));
   } catch    (IOException ex) {
      logger.warn("Failed to decode message.", ex);
      return null;
   }
}

Re: Discard message on deserialization errors.

Posted by John Smith <ja...@gmail.com>.
Ah ok thanks!

On Sat, 12 Oct 2019 at 11:13, Zhu Zhu <re...@gmail.com> wrote:

> I mean the Kafka source provided in Flink can correctly ignores null
> deserialized values.
>
> isEndOfStream allows you to control when to end the input stream.
> If it is used for running infinite stream jobs, you can simply return
> false.
>
> Thanks,
> Zhu Zhu
>
> John Smith <ja...@gmail.com> 于2019年10月12日周六 下午8:40写道:
>
>> The Kafka Fetcher you mean the flink JSON schemas? They throw
>> IOExceptions?
>>
>> Also what's the purpose of isEndOfStream most schemas I looked at don't
>> do anything but just return false?
>>
>> On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu, <re...@gmail.com> wrote:
>>
>>> Hi John,
>>>
>>> It should work with a *null* return value.
>>> In the java doc of DeserializationSchema#deserialize it says that
>>>
>>>> *@return The deserialized message as an object (null if the message
>>>> cannot be deserialized).*
>>>
>>>
>>> I also checked the Kafka fetcher in Flink and it can correctly handle a
>>> null deserialized record.
>>>
>>> Just pay attention to also not make
>>> *DeserializationSchema#isEndOfStream* throw errors on a null record
>>> provided.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> John Smith <ja...@gmail.com> 于2019年10月12日周六 上午5:36写道:
>>>
>>>> Hi using Flink 1.8.0.
>>>>
>>>> I am ingesting data from Kafka, unfortunately for the time being I have
>>>> not looked into using the schema registry.
>>>>
>>>> So for now I would like to write a simple deserialization schema that
>>>> discards the data if deserialization fails.
>>>>
>>>> The other option is to do in flat map with markers and split to dead
>>>> letter queue, but I'm not too concerned about that for now.
>>>>
>>>> Is it ok to just return null if deserialization fails?
>>>>
>>>> @Override
>>>> public MyObject deserialize(byte[] message) {
>>>>    try {
>>>>       return MyDecoder.decode(message));
>>>>    } catch    (IOException ex) {
>>>>       logger.warn("Failed to decode message.", ex);
>>>>       return null;
>>>>    }
>>>> }
>>>>
>>>>

Re: Discard message on deserialization errors.

Posted by Zhu Zhu <re...@gmail.com>.
I mean the Kafka source provided in Flink can correctly ignores null
deserialized values.

isEndOfStream allows you to control when to end the input stream.
If it is used for running infinite stream jobs, you can simply return false.

Thanks,
Zhu Zhu

John Smith <ja...@gmail.com> 于2019年10月12日周六 下午8:40写道:

> The Kafka Fetcher you mean the flink JSON schemas? They throw IOExceptions?
>
> Also what's the purpose of isEndOfStream most schemas I looked at don't do
> anything but just return false?
>
> On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu, <re...@gmail.com> wrote:
>
>> Hi John,
>>
>> It should work with a *null* return value.
>> In the java doc of DeserializationSchema#deserialize it says that
>>
>>> *@return The deserialized message as an object (null if the message
>>> cannot be deserialized).*
>>
>>
>> I also checked the Kafka fetcher in Flink and it can correctly handle a
>> null deserialized record.
>>
>> Just pay attention to also not make *DeserializationSchema#isEndOfStream* throw
>> errors on a null record provided.
>>
>> Thanks,
>> Zhu Zhu
>>
>> John Smith <ja...@gmail.com> 于2019年10月12日周六 上午5:36写道:
>>
>>> Hi using Flink 1.8.0.
>>>
>>> I am ingesting data from Kafka, unfortunately for the time being I have
>>> not looked into using the schema registry.
>>>
>>> So for now I would like to write a simple deserialization schema that
>>> discards the data if deserialization fails.
>>>
>>> The other option is to do in flat map with markers and split to dead
>>> letter queue, but I'm not too concerned about that for now.
>>>
>>> Is it ok to just return null if deserialization fails?
>>>
>>> @Override
>>> public MyObject deserialize(byte[] message) {
>>>    try {
>>>       return MyDecoder.decode(message));
>>>    } catch    (IOException ex) {
>>>       logger.warn("Failed to decode message.", ex);
>>>       return null;
>>>    }
>>> }
>>>
>>>

Re: Discard message on deserialization errors.

Posted by John Smith <ja...@gmail.com>.
The Kafka Fetcher you mean the flink JSON schemas? They throw IOExceptions?

Also what's the purpose of isEndOfStream most schemas I looked at don't do
anything but just return false?

On Fri., Oct. 11, 2019, 11:44 p.m. Zhu Zhu, <re...@gmail.com> wrote:

> Hi John,
>
> It should work with a *null* return value.
> In the java doc of DeserializationSchema#deserialize it says that
>
>> *@return The deserialized message as an object (null if the message
>> cannot be deserialized).*
>
>
> I also checked the Kafka fetcher in Flink and it can correctly handle a
> null deserialized record.
>
> Just pay attention to also not make *DeserializationSchema#isEndOfStream* throw
> errors on a null record provided.
>
> Thanks,
> Zhu Zhu
>
> John Smith <ja...@gmail.com> 于2019年10月12日周六 上午5:36写道:
>
>> Hi using Flink 1.8.0.
>>
>> I am ingesting data from Kafka, unfortunately for the time being I have
>> not looked into using the schema registry.
>>
>> So for now I would like to write a simple deserialization schema that
>> discards the data if deserialization fails.
>>
>> The other option is to do in flat map with markers and split to dead
>> letter queue, but I'm not too concerned about that for now.
>>
>> Is it ok to just return null if deserialization fails?
>>
>> @Override
>> public MyObject deserialize(byte[] message) {
>>    try {
>>       return MyDecoder.decode(message));
>>    } catch    (IOException ex) {
>>       logger.warn("Failed to decode message.", ex);
>>       return null;
>>    }
>> }
>>
>>

Re: Discard message on deserialization errors.

Posted by Zhu Zhu <re...@gmail.com>.
Hi John,

It should work with a *null* return value.
In the java doc of DeserializationSchema#deserialize it says that

> *@return The deserialized message as an object (null if the message cannot
> be deserialized).*


I also checked the Kafka fetcher in Flink and it can correctly handle a
null deserialized record.

Just pay attention to also not make *DeserializationSchema#isEndOfStream* throw
errors on a null record provided.

Thanks,
Zhu Zhu

John Smith <ja...@gmail.com> 于2019年10月12日周六 上午5:36写道:

> Hi using Flink 1.8.0.
>
> I am ingesting data from Kafka, unfortunately for the time being I have
> not looked into using the schema registry.
>
> So for now I would like to write a simple deserialization schema that
> discards the data if deserialization fails.
>
> The other option is to do in flat map with markers and split to dead
> letter queue, but I'm not too concerned about that for now.
>
> Is it ok to just return null if deserialization fails?
>
> @Override
> public MyObject deserialize(byte[] message) {
>    try {
>       return MyDecoder.decode(message));
>    } catch    (IOException ex) {
>       logger.warn("Failed to decode message.", ex);
>       return null;
>    }
> }
>
>