You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yun Gao <yu...@aliyun.com> on 2020/06/01 06:15:32 UTC

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

Hi Yu,

I think when the serializer returns null, the following operator should still receive a record of null. A possible thought is that the following operator may couting the number of null records received and use a metric to publish the value to a monitor system, and the monitor system promethus, and the monitor system should be able to configure alert conditions.

If null has problems, a special indicating object instance may be created like NULL_TBASE, and the operator should be able to count the number of NULL_TBASE received.

Best,
 Yun



 ------------------Original Mail ------------------
Sender:Yu Yang <yu...@gmail.com>
Send Date:Mon Jun 1 06:37:35 2020
Recipients:user <us...@flink.apache.org>
Subject:best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

Hi all, 

To deal with corrupted messages that can leak into the data source once in a while, we implement a custom DefaultKryoSerializer class as below that catches exceptions. The custom serializer returns null in read(...) method when it encounters exception in reading. With this implementation, the serializer may silently drop records.  One concern is that it may drop too many records before we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
 private static final Logger LOG = LoggerFactory.getLogger(CustomTBaseSerializer.class);
 @Override
     public void write(Kryo kryo, Output output, TBase tBase) {
         try {
             super.write(kryo, output, tBase);
        } catch (Throwable t) {
             LOG.error("Failed to write due to unexpected Throwable", t);
        }
    }

     @Override
     public TBase read(Kryo kryo, Input input, Class<TBase> tBaseClass) {
         try {
             return super.read(kryo, input, tBaseClass);
        } catch (Throwable t) {
             LOG.error("Failed to read from input due to unexpected Throwable", t);
             return null;
        }
     }
  }

Thank you!

Regards, 
-Yu

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

Posted by Arvid Heise <ar...@ververica.com>.
A common approach is to use a dead letter queue, which is an extra output
for bad input.

So the result of the read operation would look like Tuple2<TBase, byte[]>
(or use Either in scala) and return the parsed TBase on success or else put
in the invalid record byte[].

Then in your DAG, split the handling of the input:

DataStream<Tuple2<TBase, byte[]>> input = ...;
DataStream<TBase> goodInput = input.filter(recordOrRaw -> recordOrRaw.v1 !=
null).map(recordOrRaw -> recordOrRaw.v1);
// continue normal processing with goodInput
DataStream<byte[]> badInput = input.filter(recordOrRaw -> recordOrRaw.v2 !=
null).map(recordOrRaw -> recordOrRaw.v2);
badInput.write... // for example to Kafka

Then you simply need to monitor the output of badInput. You can also easily
check why they have not been able to be parsed and actually try to develop
some recovery logic if possible.

On Mon, Jun 1, 2020 at 9:33 AM Yu Yang <yu...@gmail.com> wrote:

> Thanks for the suggestion, Yun!
>
> On Sun, May 31, 2020 at 11:15 PM Yun Gao <yu...@aliyun.com> wrote:
>
>> Hi Yu,
>>
>> I think when the serializer returns *null, *the following operator
>> should still receive a record of null. A possible thought is that the
>> following operator may couting the number of null records received and use
>> a metric to publish the value to a monitor system, and the monitor system
>> promethus, and the monitor system should be able to configure alert
>> conditions.
>>
>> If *null* has problems, a special indicating object instance may be
>> created like NULL_TBASE, and the operator should be able to count the
>> number of NULL_TBASE received.
>>
>> Best,
>>  Yun
>>
>>
>> ------------------Original Mail ------------------
>> *Sender:*Yu Yang <yu...@gmail.com>
>> *Send Date:*Mon Jun 1 06:37:35 2020
>> *Recipients:*user <us...@flink.apache.org>
>> *Subject:*best practice for handling corrupted records / exceptions in
>> custom DefaultKryoSerializer?
>>
>>> Hi all,
>>>
>>> To deal with corrupted messages that can leak into the data source once
>>> in a while, we implement a custom DefaultKryoSerializer class as below that
>>> catches exceptions. The custom serializer returns null in read(...) method
>>> when it encounters exception in reading. With this implementation, the
>>> serializer may silently drop records.  One concern is that it may drop too
>>> many records before we notice and take actions. What is the best practice
>>> to handle this?
>>>
>>> The serializer processes one record at a time. Will reading a corrupted
>>> record make the serialize fail to process the next valid record?
>>>
>>> public class CustomTBaseSerializer extends TBaseSerializer {
>>>      private static final Logger LOG = LoggerFactory.getLogger
>>> (CustomTBaseSerializer.class);
>>>      @Override
>>>      public void write(Kryo kryo, Output output, TBase tBase) {
>>>          try {
>>>              super.write(kryo, output, tBase);
>>>         } catch (Throwable t) {
>>>              LOG.error("Failed to write due to unexpected Throwable", t)
>>> ;
>>>         }
>>>     }
>>>
>>>      @Override
>>>      public TBase read(Kryo kryo, Input input, Class<TBase> tBaseClass)
>>> {
>>>          try {
>>>              return super.read(kryo, input, tBaseClass);
>>>         } catch (Throwable t) {
>>>              LOG.error("Failed to read from input due to unexpected
>>> Throwable", t);
>>>              return null;
>>>         }
>>>      }
>>>   }
>>>
>>> Thank you!
>>>
>>> Regards,
>>> -Yu
>>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: best practice for handling corrupted records / exceptions in custom DefaultKryoSerializer?

Posted by Yu Yang <yu...@gmail.com>.
Thanks for the suggestion, Yun!

On Sun, May 31, 2020 at 11:15 PM Yun Gao <yu...@aliyun.com> wrote:

> Hi Yu,
>
> I think when the serializer returns *null, *the following operator should
> still receive a record of null. A possible thought is that the following
> operator may couting the number of null records received and use a metric
> to publish the value to a monitor system, and the monitor system promethus,
> and the monitor system should be able to configure alert conditions.
>
> If *null* has problems, a special indicating object instance may be
> created like NULL_TBASE, and the operator should be able to count the
> number of NULL_TBASE received.
>
> Best,
>  Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Yu Yang <yu...@gmail.com>
> *Send Date:*Mon Jun 1 06:37:35 2020
> *Recipients:*user <us...@flink.apache.org>
> *Subject:*best practice for handling corrupted records / exceptions in
> custom DefaultKryoSerializer?
>
>> Hi all,
>>
>> To deal with corrupted messages that can leak into the data source once
>> in a while, we implement a custom DefaultKryoSerializer class as below that
>> catches exceptions. The custom serializer returns null in read(...) method
>> when it encounters exception in reading. With this implementation, the
>> serializer may silently drop records.  One concern is that it may drop too
>> many records before we notice and take actions. What is the best practice
>> to handle this?
>>
>> The serializer processes one record at a time. Will reading a corrupted
>> record make the serialize fail to process the next valid record?
>>
>> public class CustomTBaseSerializer extends TBaseSerializer {
>>      private static final Logger LOG = LoggerFactory.getLogger
>> (CustomTBaseSerializer.class);
>>      @Override
>>      public void write(Kryo kryo, Output output, TBase tBase) {
>>          try {
>>              super.write(kryo, output, tBase);
>>         } catch (Throwable t) {
>>              LOG.error("Failed to write due to unexpected Throwable", t);
>>         }
>>     }
>>
>>      @Override
>>      public TBase read(Kryo kryo, Input input, Class<TBase> tBaseClass) {
>>          try {
>>              return super.read(kryo, input, tBaseClass);
>>         } catch (Throwable t) {
>>              LOG.error("Failed to read from input due to unexpected
>> Throwable", t);
>>              return null;
>>         }
>>      }
>>   }
>>
>> Thank you!
>>
>> Regards,
>> -Yu
>>
>