You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by shyla deshpande <de...@gmail.com> on 2016/11/17 19:30:34 UTC
Spark 2.0.2, Structured Streaming with kafka source... Unable to
parse the value to Object..
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val dframe1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()
*How do I deserialize the value column from dataframe1 *
*which is Array[Byte] to Student object using Student.parseFrom..???*
*Please help.*
*Thanks.*
// Stream of votes from Kafka as bytesval votesAsBytes =
KafkaUtils.createDirectStream[String, Array[Byte]](
ssc, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"),
kafkaParams))
// Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
(cr: ConsumerRecord[String, Array[Byte]]) =>
Vote.parseFrom(cr.value())}
Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to
parse the value to Object..
Posted by Michael Armbrust <mi...@databricks.com>.
You could also do this with Datasets, which will probably be a little more
efficient (since you are telling us you only care about one column)
ds1.select($"value".as[Array[Byte]]).map(Student.parseFrom)
On Thu, Nov 17, 2016 at 1:05 PM, shyla deshpande <de...@gmail.com>
wrote:
> Hello everyone,
> The following code works ...
>
> def main(args : Array[String]) {
>
> val spark = SparkSession.builder.
> master("local")
> .appName("spark session example")
> .getOrCreate()
>
> import spark.implicits._
>
> val ds1 = spark.readStream.format("kafka").
> option("kafka.bootstrap.servers","localhost:9092").
> option("subscribe","student").load()
>
> val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
>
> val query = ds2.writeStream
> .outputMode("append")
> .format("console")
> .start()
>
> query.awaitTermination()
>
> }
>
>
> On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <
> deshpandeshyla@gmail.com> wrote:
>
>> val spark = SparkSession.builder.
>> master("local")
>> .appName("spark session example")
>> .getOrCreate()
>>
>> import spark.implicits._
>>
>> val dframe1 = spark.readStream.format("kafka").
>> option("kafka.bootstrap.servers","localhost:9092").
>> option("subscribe","student").load()
>>
>> *How do I deserialize the value column from dataframe1 *
>>
>> *which is Array[Byte] to Student object using Student.parseFrom..???*
>>
>> *Please help.*
>>
>> *Thanks.*
>>
>>
>>
>> // Stream of votes from Kafka as bytesval votesAsBytes = KafkaUtils.createDirectStream[String, Array[Byte]](
>> ssc, LocationStrategies.PreferConsistent,
>> ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), kafkaParams))
>> // Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
>> (cr: ConsumerRecord[String, Array[Byte]]) =>
>> Vote.parseFrom(cr.value())}
>>
>>
>
Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to
parse the value to Object..
Posted by shyla deshpande <de...@gmail.com>.
Hello everyone,
The following code works ...
def main(args : Array[String]) {
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()
val ds2 = ds1.map(row=>
row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande <de...@gmail.com>
wrote:
> val spark = SparkSession.builder.
> master("local")
> .appName("spark session example")
> .getOrCreate()
>
> import spark.implicits._
>
> val dframe1 = spark.readStream.format("kafka").
> option("kafka.bootstrap.servers","localhost:9092").
> option("subscribe","student").load()
>
> *How do I deserialize the value column from dataframe1 *
>
> *which is Array[Byte] to Student object using Student.parseFrom..???*
>
> *Please help.*
>
> *Thanks.*
>
>
>
> // Stream of votes from Kafka as bytesval votesAsBytes = KafkaUtils.createDirectStream[String, Array[Byte]](
> ssc, LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), kafkaParams))
> // Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
> (cr: ConsumerRecord[String, Array[Byte]]) =>
> Vote.parseFrom(cr.value())}
>
>