You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lk_spark <lk...@163.com> on 2019/09/17 02:39:28 UTC
how can I dynamic parse json in kafka when using Structured Streaming
hi,all :
I'm using Structured Streaming to read kafka , the data type is json String , I want to parse it and conver to a datafrme , my code can't pass compile , I don't know how to fix it:
val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String]
val words = lines.map(line => {
var json: JValue = null
try {
json = parse(line)
} catch {
case ex: Exception => { println(ex.getMessage + " " + line) }
}
//var result: scala.collection.mutable.Map[String,String] = scala.collection.mutable.Map()
val jsonObj = json.values.asInstanceOf[Map[String, _]]
val valuse = jsonObj.values.toArray
val schema = StructType(List())
for ((k, v) <- jsonObj){
//result += (k -> jsonObj.get(k).toString())
if(v.isInstanceOf[String]){
schema.add(k,StringType)
}else if (v.isInstanceOf[Int]){
schema.add(k,IntegerType)
}/*else if (v.isInstanceOf[Array[String]]){
schema.add(k,ArrayType(StringType))
}else if (v.isInstanceOf[Map[String,String]]){
schema.add(k,MapType(StringType,StringType))
}*/
}
val row = new GenericRowWithSchema(valuse,schema)
row
})
Error:(45, 26) Unable to find encoder for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val words = lines.map(line => {
Error:(45, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
Unspecified value parameter evidence$6.
val words = lines.map(line => {
2019-09-17
lk_spark
Re: Re: how can I dynamic parse json in kafka when using Structured Streaming
Posted by lk_spark <lk...@163.com>.
I want to parse the Struct of data dynamically , then write data to delta lake , I think it can automatically merge scheme.
2019-09-17
lk_spark
发件人:Tathagata Das <ta...@gmail.com>
发送时间:2019-09-17 16:13
主题:Re: how can I dynamic parse json in kafka when using Structured Streaming
收件人:"lk_spark"<lk...@163.com>
抄送:"user.spark"<us...@spark.apache.org>
You can use from_json built-in SQL function to parse json.
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-
On Mon, Sep 16, 2019 at 7:39 PM lk_spark <lk...@163.com> wrote:
hi,all :
I'm using Structured Streaming to read kafka , the data type is json String , I want to parse it and conver to a datafrme , my code can't pass compile , I don't know how to fix it:
val lines = messages.selectExpr("CAST(value AS STRING) as value").as[String]
val words = lines.map(line => {
var json: JValue = null
try {
json = parse(line)
} catch {
case ex: Exception => { println(ex.getMessage + " " + line) }
}
//var result: scala.collection.mutable.Map[String,String] = scala.collection.mutable.Map()
val jsonObj = json.values.asInstanceOf[Map[String, _]]
val valuse = jsonObj.values.toArray
val schema = StructType(List())
for ((k, v) <- jsonObj){
//result += (k -> jsonObj.get(k).toString())
if(v.isInstanceOf[String]){
schema.add(k,StringType)
}else if (v.isInstanceOf[Int]){
schema.add(k,IntegerType)
}/*else if (v.isInstanceOf[Array[String]]){
schema.add(k,ArrayType(StringType))
}else if (v.isInstanceOf[Map[String,String]]){
schema.add(k,MapType(StringType,StringType))
}*/
}
val row = new GenericRowWithSchema(valuse,schema)
row
})
Error:(45, 26) Unable to find encoder for type org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is needed to store org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val words = lines.map(line => {
Error:(45, 26) not enough arguments for method map: (implicit evidence$6: org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
Unspecified value parameter evidence$6.
val words = lines.map(line => {
2019-09-17
lk_spark
Re: how can I dynamic parse json in kafka when using Structured Streaming
Posted by Tathagata Das <ta...@gmail.com>.
You can use *from_json* built-in SQL function to parse json.
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#from_json-org.apache.spark.sql.Column-org.apache.spark.sql.Column-
On Mon, Sep 16, 2019 at 7:39 PM lk_spark <lk...@163.com> wrote:
> hi,all :
> I'm using Structured Streaming to read kafka , the data type is json
> String , I want to parse it and conver to a datafrme , my code can't pass
> compile , I don't know how to fix it:
>
>
> val lines = messages.selectExpr("CAST(value AS STRING) as value").as[
> String]
>
> val words = lines.map(line => {
> var json: JValue = null
> try {
> json = parse(line)
> } catch {
> case ex: Exception => { println(ex.getMessage + " " + line) }
> }
> //var result: scala.collection.mutable.Map[String,String] =
> scala.collection.mutable.Map()
> val jsonObj = json.values.asInstanceOf[Map[String, _]]
> val valuse = jsonObj.values.toArray
> val schema = StructType(List())
> for ((k, v) <- jsonObj){
> //result += (k -> jsonObj.get(k).toString())
>
> if(v.isInstanceOf[String]){
> schema.add(k,StringType)
> }else if (v.isInstanceOf[Int]){
> schema.add(k,IntegerType)
> }/*else if (v.isInstanceOf[Array[String]]){
> schema.add(k,ArrayType(StringType))
> }else if (v.isInstanceOf[Map[String,String]]){
> schema.add(k,MapType(StringType,StringType))
> }*/
> }
> val row = new GenericRowWithSchema(valuse,schema)
> row
> })
>
>
> Error:(45, 26) Unable to find encoder for type
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema. An implicit
> Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema] is
> needed to store
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema instances in
> a Dataset. Primitive types (Int, String, etc) and Product types (case
> classes) are supported by importing spark.implicits._ Support for
> serializing other types will be added in future releases.
> val words = lines.map(line => {
>
> Error:(45, 26) not enough arguments for method map: (implicit evidence$6:
> org.apache.spark.sql.Encoder[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema])org.apache.spark.sql.Dataset[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema].
> Unspecified value parameter evidence$6.
> val words = lines.map(line => {
>
>
>
> 2019-09-17
> ------------------------------
> lk_spark
>