You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by Daniel Haviv <da...@veracity-group.com> on 2015/09/25 18:18:43 UTC
Parsing avro binary data from Spark Streaming
Hi,
I'm receiving avro data from Kafka in my Spark Streaming app.
When reading the data directly from disk I would have just used the
following manner to parse it :
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable,
AvroInputFormat[GenericRecord]]("/incoming_1k").coalesce(10)
val txtRDD = avroRDD.map(l => {l._1.datum.toString} )
I would like to do the same with avro data coming in from kafka, so I'm
doing the following:
val avroStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)
This leaves me with a byte array and I can't find any example on how to
convert a byte array to either a GenericRecord or to my avro class.
Any help will be appreciated.
Daniel
Re: Parsing avro binary data from Spark Streaming
Posted by Maulik Gandhi <mm...@gmail.com>.
Hi Daniel,
Below code snippet should help
public SpecificRecord fromBytes(final byte[] bytes, final
Class<SpecificRecord> clazz) {
final BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(bytes, 0, bytes.length, null);
final DatumReader<SpecificRecord> datumReader = new
SpecificDatumReader<SpecificRecord>(clazz);
try {
final Method newBuilder = clazz.getMethod("newBuilder", clazz);
return ((SpecificRecordBuilderBase<?>)
newBuilder.invoke(null, datumReader.read(null, decoder))).build();
} catch (final IllegalArgumentException e) {
throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
} catch (final IllegalAccessException e) {
throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
} catch (final InvocationTargetException e) {
throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
} catch (final IOException e) {
throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
} catch (final SecurityException e) {
throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
} catch (final NoSuchMethodException e) {
throw new IllegalStateException("Unable to deserialize
avro" + clazz, e);
}
}
Thanks.
- Maulik
On Fri, Sep 25, 2015 at 11:18 AM, Daniel Haviv <
daniel.haviv@veracity-group.com> wrote:
> Hi,
> I'm receiving avro data from Kafka in my Spark Streaming app.
> When reading the data directly from disk I would have just used the
> following manner to parse it :
> val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable,
> AvroInputFormat[GenericRecord]]("/incoming_1k").coalesce(10)
> val txtRDD = avroRDD.map(l => {l._1.datum.toString} )
>
> I would like to do the same with avro data coming in from kafka, so I'm
> doing the following:
> val avroStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)
>
> This leaves me with a byte array and I can't find any example on how to
> convert a byte array to either a GenericRecord or to my avro class.
>
> Any help will be appreciated.
>
> Daniel
>