You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by shyla deshpande <de...@gmail.com> on 2016/11/17 19:30:34 UTC

Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

import spark.implicits._

val dframe1 = spark.readStream.format("kafka").
  option("kafka.bootstrap.servers","localhost:9092").
  option("subscribe","student").load()

*How do I deserialize the value column from dataframe1 *

*which is Array[Byte] to Student object using Student.parseFrom..???*

*Please help.*

*Thanks.*



// Stream of votes from Kafka as bytesval votesAsBytes =
KafkaUtils.createDirectStream[String, Array[Byte]](
  ssc, LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"),
kafkaParams))
// Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
  (cr: ConsumerRecord[String, Array[Byte]]) =>
    Vote.parseFrom(cr.value())}

Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

Posted by Michael Armbrust <mi...@databricks.com>.
You could also do this with Datasets, which will probably be a little more
efficient (since you are telling us you only care about one column)

ds1.select($"value".as[Array[Byte]]).map(Student.parseFrom)

On Thu, Nov 17, 2016 at 1:05 PM, shyla deshpande <de...@gmail.com>
wrote:

> Hello everyone,
>  The following code works ...
>
> def main(args : Array[String]) {
>
>   val spark = SparkSession.builder.
>     master("local")
>     .appName("spark session example")
>     .getOrCreate()
>
>   import spark.implicits._
>
>   val ds1 = spark.readStream.format("kafka").
>     option("kafka.bootstrap.servers","localhost:9092").
>     option("subscribe","student").load()
>
>   val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
>
>   val query = ds2.writeStream
>     .outputMode("append")
>     .format("console")
>     .start()
>
>   query.awaitTermination()
>
> }
>
>
> On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <
> deshpandeshyla@gmail.com> wrote:
>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val dframe1 = spark.readStream.format("kafka").
>>   option("kafka.bootstrap.servers","localhost:9092").
>>   option("subscribe","student").load()
>>
>> *How do I deserialize the value column from dataframe1 *
>>
>> *which is Array[Byte] to Student object using Student.parseFrom..???*
>>
>> *Please help.*
>>
>> *Thanks.*
>>
>>
>>
>> // Stream of votes from Kafka as bytesval votesAsBytes = KafkaUtils.createDirectStream[String, Array[Byte]](
>>   ssc, LocationStrategies.PreferConsistent,
>>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), kafkaParams))
>> // Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
>>   (cr: ConsumerRecord[String, Array[Byte]]) =>
>>     Vote.parseFrom(cr.value())}
>>
>>
>

Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

Posted by shyla deshpande <de...@gmail.com>.
Hello everyone,
 The following code works ...

def main(args : Array[String]) {

  val spark = SparkSession.builder.
    master("local")
    .appName("spark session example")
    .getOrCreate()

  import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","student").load()

  val ds2 = ds1.map(row=>
row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

  query.awaitTermination()

}


On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <de...@gmail.com>
wrote:

> val spark = SparkSession.builder.
>   master("local")
>   .appName("spark session example")
>   .getOrCreate()
>
> import spark.implicits._
>
> val dframe1 = spark.readStream.format("kafka").
>   option("kafka.bootstrap.servers","localhost:9092").
>   option("subscribe","student").load()
>
> *How do I deserialize the value column from dataframe1 *
>
> *which is Array[Byte] to Student object using Student.parseFrom..???*
>
> *Please help.*
>
> *Thanks.*
>
>
>
> // Stream of votes from Kafka as bytesval votesAsBytes = KafkaUtils.createDirectStream[String, Array[Byte]](
>   ssc, LocationStrategies.PreferConsistent,
>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), kafkaParams))
> // Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
>   (cr: ConsumerRecord[String, Array[Byte]]) =>
>     Vote.parseFrom(cr.value())}
>
>