You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Daniel Haviv <da...@veracity-group.com> on 2015/09/25 18:18:43 UTC

Parsing avro binary data from Spark Streaming

Hi,
I'm receiving avro data from Kafka in my Spark Streaming app.
When reading the data directly from disk I would have just used the
following manner to parse it :
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable,
AvroInputFormat[GenericRecord]]("/incoming_1k").coalesce(10)
val txtRDD = avroRDD.map(l => {l._1.datum.toString} )

I would like to do the same with avro data coming in from kafka, so I'm
doing the following:
val avroStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)

This leaves me with a byte array and I can't find any example on how to
convert a byte array to either a GenericRecord or to my avro class.

Any help will be appreciated.

Daniel

Re: Parsing avro binary data from Spark Streaming

Posted by Maulik Gandhi <mm...@gmail.com>.
Hi Daniel,


Below code snippet should help

public SpecificRecord fromBytes(final byte[] bytes, final
Class<SpecificRecord> clazz) {
        final BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(bytes, 0, bytes.length, null);
        final DatumReader<SpecificRecord> datumReader = new
SpecificDatumReader<SpecificRecord>(clazz);
        try {
            final Method newBuilder = clazz.getMethod("newBuilder", clazz);
            return ((SpecificRecordBuilderBase<?>)
newBuilder.invoke(null, datumReader.read(null, decoder))).build();
        } catch (final IllegalArgumentException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final IllegalAccessException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final InvocationTargetException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final IOException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final SecurityException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        } catch (final NoSuchMethodException e) {
            throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
        }
    }


Thanks.

- Maulik


On Fri, Sep 25, 2015 at 11:18 AM, Daniel Haviv <
daniel.haviv@veracity-group.com> wrote:

> Hi,
> I'm receiving avro data from Kafka in my Spark Streaming app.
> When reading the data directly from disk I would have just used the
> following manner to parse it :
> val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable,
> AvroInputFormat[GenericRecord]]("/incoming_1k").coalesce(10)
> val txtRDD = avroRDD.map(l => {l._1.datum.toString} )
>
> I would like to do the same with avro data coming in from kafka, so I'm
> doing the following:
> val avroStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)
>
> This leaves me with a byte array and I can't find any example on how to
> convert a byte array to either a GenericRecord or to my avro class.
>
> Any help will be appreciated.
>
> Daniel
>