You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mohammad Tariq <do...@gmail.com> on 2016/02/25 20:06:56 UTC

Access fields by name/index from Avro data read from Kafka through Spark Streaming

Hi group,

I have just started working with confluent platform and spark streaming,
and was wondering if it is possible to access individual fields from an
Avro object read from a kafka topic through spark streaming. As per its
default behaviour *KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)*
return a *DStream[Object,
Object]*, and don't have any schema associated with *Object*(or I am unable
to figure it out). This makes it impossible to perform some operations on
this DStream, for example, converting it to a Spark DataFrame.

Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
*Object *I think I am going in the wrong direction. Any
pointers/suggestions would be really helpful.

*Versions used :*
confluent-1.0.1
spark-1.6.0-bin-hadoop2.4
Scala code runner version - 2.11.6

And this is the small piece of code I am using :

package org.myorg.scalaexamples

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.SparkContext
import org.apache.avro.mapred.AvroKey
import org.apache.spark.sql.SQLContext
//import org.apache.avro.mapred.AvroValue
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
import org.apache.avro.generic.GenericRecord
import org.apache.spark.streaming.dstream.DStream
import io.confluent.kafka.serializers.KafkaAvroDecoder
//import org.apache.hadoop.io.serializer.avro.AvroRecord
//import org.apache.spark.streaming.dstream.ForEachDStream
import org.apache.spark.sql.SQLContext
import org.apache.kafka.common.serialization.Deserializer

object DirectKafkaWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }
    val Array(brokers, topics) = args
    val sparkConf = new
SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")

sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokers, "group.id" -> "consumer",
      "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
http://localhost:8081")
    val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
    messages.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Thank you so much for your valuable time!


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>

Re: Access fields by name/index from Avro data read from Kafka through Spark Streaming

Posted by Harsh J <ha...@cloudera.com>.
You should be able to cast the object type to the real underlying type
(GenericRecord (if generic, which is so by default), or the actual type
class (if specific)). The underlying implementation of KafkaAvroDecoder
seems to use either one of those depending on a config switch:
https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L206-L218
 and
https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializerConfig.java#L27-L28

Once you have the right underlying class, extracting fields should be
simpler/direct, and would not need a mid-transformation to JSON.

On Fri, 26 Feb 2016 at 06:00 Mohammad Tariq <do...@gmail.com> wrote:

> I got it working by using jsonRDD. This is what I had to do in order to
> make it work :
>
>       val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>       val lines = messages.map(_._2.toString)
>       lines.foreachRDD(jsonRDD => {
>         val sqlContext =
> SQLContextSingleton.getInstance(jsonRDD.sparkContext)
>         val data = sqlContext.read.json(jsonRDD)
>         data.printSchema()
>         data.show()
>         data.select("COL_NAME").show()
>         data.groupBy("COL_NAME").count().show()
>       })
>
> Not sure though if it's the best way to achieve this.
>
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>
> On Fri, Feb 26, 2016 at 5:21 AM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> You can use `DStream.map` to transform objects to anything you want.
>>
>> On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <do...@gmail.com>
>> wrote:
>>
>>> Hi group,
>>>
>>> I have just started working with confluent platform and spark streaming,
>>> and was wondering if it is possible to access individual fields from an
>>> Avro object read from a kafka topic through spark streaming. As per its
>>> default behaviour *KafkaUtils.createDirectStream[Object, Object,
>>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
>>> a *DStream[Object, Object]*, and don't have any schema associated with
>>> *Object*(or I am unable to figure it out). This makes it impossible to
>>> perform some operations on this DStream, for example, converting it to a
>>> Spark DataFrame.
>>>
>>> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
>>> *Object *I think I am going in the wrong direction. Any
>>> pointers/suggestions would be really helpful.
>>>
>>> *Versions used :*
>>> confluent-1.0.1
>>> spark-1.6.0-bin-hadoop2.4
>>> Scala code runner version - 2.11.6
>>>
>>> And this is the small piece of code I am using :
>>>
>>> package org.myorg.scalaexamples
>>>
>>> import org.apache.spark.rdd.RDD
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.SparkContext
>>> import org.apache.avro.mapred.AvroKey
>>> import org.apache.spark.sql.SQLContext
>>> //import org.apache.avro.mapred.AvroValue
>>> import org.apache.spark.streaming.kafka._
>>> import org.apache.spark.storage.StorageLevel
>>> import org.apache.avro.generic.GenericRecord
>>> import org.apache.spark.streaming.dstream.DStream
>>> import io.confluent.kafka.serializers.KafkaAvroDecoder
>>> //import org.apache.hadoop.io.serializer.avro.AvroRecord
>>> //import org.apache.spark.streaming.dstream.ForEachDStream
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.kafka.common.serialization.Deserializer
>>>
>>> object DirectKafkaWordCount {
>>>   def main(args: Array[String]) {
>>>     if (args.length < 2) {
>>>       System.err.println(s"""
>>>         |Usage: DirectKafkaWordCount <brokers> <topics>
>>>         |  <brokers> is a list of one or more Kafka brokers
>>>         |  <topics> is a list of one or more kafka topics to consume from
>>>         |
>>>         """.stripMargin)
>>>       System.exit(1)
>>>     }
>>>     val Array(brokers, topics) = args
>>>     val sparkConf = new
>>> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>>>
>>> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
>>>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>>     val topicsSet = topics.split(",").toSet
>>>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
>>> brokers, "group.id" -> "consumer",
>>>       "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
>>> http://localhost:8081")
>>>     val messages = KafkaUtils.createDirectStream[Object, Object,
>>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>>>     messages.print()
>>>     ssc.start()
>>>     ssc.awaitTermination()
>>>   }
>>> }
>>>
>>> Thank you so much for your valuable time!
>>>
>>>
>>> [image: http://]
>>>
>>> Tariq, Mohammad
>>> about.me/mti
>>> [image: http://]
>>> <http://about.me/mti>
>>>
>>>
>>
>>
>

Re: Access fields by name/index from Avro data read from Kafka through Spark Streaming

Posted by Mohammad Tariq <do...@gmail.com>.
I got it working by using jsonRDD. This is what I had to do in order to
make it work :

      val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
      val lines = messages.map(_._2.toString)
      lines.foreachRDD(jsonRDD => {
        val sqlContext =
SQLContextSingleton.getInstance(jsonRDD.sparkContext)
        val data = sqlContext.read.json(jsonRDD)
        data.printSchema()
        data.show()
        data.select("COL_NAME").show()
        data.groupBy("COL_NAME").count().show()
      })

Not sure though if it's the best way to achieve this.



[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>


On Fri, Feb 26, 2016 at 5:21 AM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> You can use `DStream.map` to transform objects to anything you want.
>
> On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <do...@gmail.com>
> wrote:
>
>> Hi group,
>>
>> I have just started working with confluent platform and spark streaming,
>> and was wondering if it is possible to access individual fields from an
>> Avro object read from a kafka topic through spark streaming. As per its
>> default behaviour *KafkaUtils.createDirectStream[Object, Object,
>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
>> a *DStream[Object, Object]*, and don't have any schema associated with
>> *Object*(or I am unable to figure it out). This makes it impossible to
>> perform some operations on this DStream, for example, converting it to a
>> Spark DataFrame.
>>
>> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
>> *Object *I think I am going in the wrong direction. Any
>> pointers/suggestions would be really helpful.
>>
>> *Versions used :*
>> confluent-1.0.1
>> spark-1.6.0-bin-hadoop2.4
>> Scala code runner version - 2.11.6
>>
>> And this is the small piece of code I am using :
>>
>> package org.myorg.scalaexamples
>>
>> import org.apache.spark.rdd.RDD
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming._
>> import org.apache.spark.SparkContext
>> import org.apache.avro.mapred.AvroKey
>> import org.apache.spark.sql.SQLContext
>> //import org.apache.avro.mapred.AvroValue
>> import org.apache.spark.streaming.kafka._
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.avro.generic.GenericRecord
>> import org.apache.spark.streaming.dstream.DStream
>> import io.confluent.kafka.serializers.KafkaAvroDecoder
>> //import org.apache.hadoop.io.serializer.avro.AvroRecord
>> //import org.apache.spark.streaming.dstream.ForEachDStream
>> import org.apache.spark.sql.SQLContext
>> import org.apache.kafka.common.serialization.Deserializer
>>
>> object DirectKafkaWordCount {
>>   def main(args: Array[String]) {
>>     if (args.length < 2) {
>>       System.err.println(s"""
>>         |Usage: DirectKafkaWordCount <brokers> <topics>
>>         |  <brokers> is a list of one or more Kafka brokers
>>         |  <topics> is a list of one or more kafka topics to consume from
>>         |
>>         """.stripMargin)
>>       System.exit(1)
>>     }
>>     val Array(brokers, topics) = args
>>     val sparkConf = new
>> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>>
>> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
>>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>>     val topicsSet = topics.split(",").toSet
>>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> brokers, "group.id" -> "consumer",
>>       "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
>> http://localhost:8081")
>>     val messages = KafkaUtils.createDirectStream[Object, Object,
>> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>>     messages.print()
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>> }
>>
>> Thank you so much for your valuable time!
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> <http://about.me/mti>
>>
>>
>
>

Re: Access fields by name/index from Avro data read from Kafka through Spark Streaming

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
You can use `DStream.map` to transform objects to anything you want.

On Thu, Feb 25, 2016 at 11:06 AM, Mohammad Tariq <do...@gmail.com> wrote:

> Hi group,
>
> I have just started working with confluent platform and spark streaming,
> and was wondering if it is possible to access individual fields from an
> Avro object read from a kafka topic through spark streaming. As per its
> default behaviour *KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)* return
> a *DStream[Object, Object]*, and don't have any schema associated with
> *Object*(or I am unable to figure it out). This makes it impossible to
> perform some operations on this DStream, for example, converting it to a
> Spark DataFrame.
>
> Since *KafkaAvroDecoder *doesn't allow us to have any other Class but
> *Object *I think I am going in the wrong direction. Any
> pointers/suggestions would be really helpful.
>
> *Versions used :*
> confluent-1.0.1
> spark-1.6.0-bin-hadoop2.4
> Scala code runner version - 2.11.6
>
> And this is the small piece of code I am using :
>
> package org.myorg.scalaexamples
>
> import org.apache.spark.rdd.RDD
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.SparkContext
> import org.apache.avro.mapred.AvroKey
> import org.apache.spark.sql.SQLContext
> //import org.apache.avro.mapred.AvroValue
> import org.apache.spark.streaming.kafka._
> import org.apache.spark.storage.StorageLevel
> import org.apache.avro.generic.GenericRecord
> import org.apache.spark.streaming.dstream.DStream
> import io.confluent.kafka.serializers.KafkaAvroDecoder
> //import org.apache.hadoop.io.serializer.avro.AvroRecord
> //import org.apache.spark.streaming.dstream.ForEachDStream
> import org.apache.spark.sql.SQLContext
> import org.apache.kafka.common.serialization.Deserializer
>
> object DirectKafkaWordCount {
>   def main(args: Array[String]) {
>     if (args.length < 2) {
>       System.err.println(s"""
>         |Usage: DirectKafkaWordCount <brokers> <topics>
>         |  <brokers> is a list of one or more Kafka brokers
>         |  <topics> is a list of one or more kafka topics to consume from
>         |
>         """.stripMargin)
>       System.exit(1)
>     }
>     val Array(brokers, topics) = args
>     val sparkConf = new
> SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
>
> sparkConf.registerKryoClasses(Array(classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]]))
>     val ssc = new StreamingContext(sparkConf, Seconds(5))
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers, "group.id" -> "consumer",
>       "zookeeper.connect" -> "localhost:2181", "schema.registry.url" -> "
> http://localhost:8081")
>     val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicsSet)
>     messages.print()
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
>
> Thank you so much for your valuable time!
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>