You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flume.apache.org by ed <ed...@gmail.com> on 2014/01/01 03:34:32 UTC

Handling malformed data when using custom AvroEventSerializer and HDFS Sink

Hello,

We are using Flume v1.4 to load JSON formatted log data into HDFS as Avro.
 Our flume setup looks like this:

NXLog ==> (FlumeHTTPSource -> HDFSSink w/ custom EventSerializer)

Right now our custom EventSerializer (which extends
AbstractAvroEventSerializer) takes the JSON input from the HTTPSource and
converts it into an avro record of the appropriate type for the incoming
log file.  This is working great and we use the serializer to add some
additional "synthetic" fields to the avro record that don't exist in the
original JSON log data.

My question concerns how to handle malformed JSON data (or really any error
inside of the custom EventSerializer).  It's very likely that as we parse
the JSON there will be records where something is malformed (either the
JSON itself, or a field is of the wrong type etc.).

For example, a "port" field which should always be an Integer might for
some reason have some ASCII text in it.  I'd like to catch these errors in
the EventSerializer and then write out the bad JSON to a log file somewhere
that we can monitor.

What is the best way to do this?  Right now, all the logic for catching bad
JSON would be inside of the "convert" function of the EventSerializer.
 Should the convert function itself throw an exception that will be
gracefully handled upstream or do I just return a "null" value if there was
an error?  Would it be appropriate to log errors directly to a database
from inside the EventSerializer convert method or would this be too slow?
 What are the best practices for this type of error handling?

Thank you for any assistance!

Best Regards,

Ed

Re: Handling malformed data when using custom AvroEventSerializer and HDFS Sink

Posted by Devin Suiter RDX <ds...@rdx.com>.
Just throwing this out there, since I haven't had time to dig into the API
with a big fork, but, can morphlines offer any assistance here?

Some kind of an interceptor that would parse for malformed data, package
the offending data and send it somewhere (email it, log it), and then
project a valid "there was something wrong here" piece of data into the
field then allow your channel to carry on? Or skip the projection piece and
just move along? I was just thinking that the projection of known data into
a field that previously had malformed data would allow you to easily locate
those records later with the projected data, but keep your data shape
consistent.

Kind of looking to Brock as a sounding board as to the appropriateness of
this as a potential solution since morphlines takes some time to really
understand well...

*Devin Suiter*
Jr. Data Solutions Software Engineer
100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212
Google Voice: 412-256-8556 | www.rdx.com


On Thu, Jan 2, 2014 at 10:25 AM, Brock Noland <br...@cloudera.com> wrote:

>
> On Tue, Dec 31, 2013 at 8:34 PM, ed <ed...@gmail.com> wrote:
>
>> Hello,
>>
>> We are using Flume v1.4 to load JSON formatted log data into HDFS as
>> Avro.  Our flume setup looks like this:
>>
>> NXLog ==> (FlumeHTTPSource -> HDFSSink w/ custom EventSerializer)
>>
>> Right now our custom EventSerializer (which extends
>> AbstractAvroEventSerializer) takes the JSON input from the HTTPSource and
>> converts it into an avro record of the appropriate type for the incoming
>> log file.  This is working great and we use the serializer to add some
>> additional "synthetic" fields to the avro record that don't exist in the
>> original JSON log data.
>>
>> My question concerns how to handle malformed JSON data (or really any
>> error inside of the custom EventSerializer).  It's very likely that as we
>> parse the JSON there will be records where something is malformed (either
>> the JSON itself, or a field is of the wrong type etc.).
>>
>> For example, a "port" field which should always be an Integer might for
>> some reason have some ASCII text in it.  I'd like to catch these errors in
>> the EventSerializer and then write out the bad JSON to a log file somewhere
>> that we can monitor.
>>
>
> Yeah it would be nice to have a better story about this in Flume.
>
>
>>
>> What is the best way to do this?
>>
>
> Typically people will either log it to a file or send it through another
> "flow" to a different HDFS sink.
>
>
>
>> Right now, all the logic for catching bad JSON would be inside of the
>> "convert" function of the EventSerializer.  Should the convert function
>> itself throw an exception that will be gracefully handled upstream
>>
>
> The exception will be logged but that is it..
>
>
>> or do I just return a "null" value if there was an error?  Would it be
>> appropriate to log errors directly to a database from inside the
>> EventSerializer convert method or would this be too slow?
>>
>
> That might be too slow to do directly. If I did that I'd have a separate
> thread doing that and then an in-memory queue between the serializer and
> thread.
>
>
>> What are the best practices for this type of error handling?
>>
>
> If looks to me like we'd need to change AbstractAvroEventSerilizer to
> filter out nulls:
>
>
> https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/AbstractAvroEventSerializer.java#L106
>
> which we could easily do.  Since you don't want to wait for that you could
> override the write method to do this.
>
>
>>
>> Thank you for any assistance!
>>
>> Best Regards,
>>
>> Ed
>>
>
>
>
> --
> Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
>

Re: Handling malformed data when using custom AvroEventSerializer and HDFS Sink

Posted by Brock Noland <br...@cloudera.com>.
On Tue, Dec 31, 2013 at 8:34 PM, ed <ed...@gmail.com> wrote:

> Hello,
>
> We are using Flume v1.4 to load JSON formatted log data into HDFS as Avro.
>  Our flume setup looks like this:
>
> NXLog ==> (FlumeHTTPSource -> HDFSSink w/ custom EventSerializer)
>
> Right now our custom EventSerializer (which extends
> AbstractAvroEventSerializer) takes the JSON input from the HTTPSource and
> converts it into an avro record of the appropriate type for the incoming
> log file.  This is working great and we use the serializer to add some
> additional "synthetic" fields to the avro record that don't exist in the
> original JSON log data.
>
> My question concerns how to handle malformed JSON data (or really any
> error inside of the custom EventSerializer).  It's very likely that as we
> parse the JSON there will be records where something is malformed (either
> the JSON itself, or a field is of the wrong type etc.).
>
> For example, a "port" field which should always be an Integer might for
> some reason have some ASCII text in it.  I'd like to catch these errors in
> the EventSerializer and then write out the bad JSON to a log file somewhere
> that we can monitor.
>

Yeah it would be nice to have a better story about this in Flume.


>
> What is the best way to do this?
>

Typically people will either log it to a file or send it through another
"flow" to a different HDFS sink.



> Right now, all the logic for catching bad JSON would be inside of the
> "convert" function of the EventSerializer.  Should the convert function
> itself throw an exception that will be gracefully handled upstream
>

The exception will be logged but that is it..


> or do I just return a "null" value if there was an error?  Would it be
> appropriate to log errors directly to a database from inside the
> EventSerializer convert method or would this be too slow?
>

That might be too slow to do directly. If I did that I'd have a separate
thread doing that and then an in-memory queue between the serializer and
thread.


> What are the best practices for this type of error handling?
>

If looks to me like we'd need to change AbstractAvroEventSerilizer to
filter out nulls:

https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/AbstractAvroEventSerializer.java#L106

which we could easily do.  Since you don't want to wait for that you could
override the write method to do this.


>
> Thank you for any assistance!
>
> Best Regards,
>
> Ed
>



-- 
Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org