You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lk_spark <lk...@163.com> on 2019/11/25 08:00:03 UTC
how spark structrued stream write to kudu
hi,all:
I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below :
val kuduContext = new KuduContext("master:7051", spark.sparkContext)
val console = cnew.select("*").as[CstoreNew]
.writeStream
.option("checkpointLocation", "/tmp/t3/")
.trigger(Trigger.Once())
.foreach(new ForeachWriter[CstoreNew] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(value: CstoreNew): Unit = {
val spark = SparkSessionSingleton.getInstance(sparkConf)
val valueDF = Seq(value).toDF() // GET WRONG
kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
}
override def close(errorOrNull: Throwable): Unit = {
}
})
val query = console.start()
query.awaitTermination()
when run to val valueDF = Seq(value).toDF() I got error msg :
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...
and SQLImplicits.scala:228 is :
227: implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
228: DatasetHolder(_sqlContext.createDataset(s))
229: }
can anyone give me some help?
2019-11-25
lk_spark
Re: how spark structrued stream write to kudu
Posted by lk_spark <lk...@163.com>.
I found _sqlContext is null , how to resolve it ?
2019-11-25
lk_spark
发件人:"lk_spark"<lk...@163.com>
发送时间:2019-11-25 16:00
主题:how spark structrued stream write to kudu
收件人:"user.spark"<us...@spark.apache.org>
抄送:
hi,all:
I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below :
val kuduContext = new KuduContext("master:7051", spark.sparkContext)
val console = cnew.select("*").as[CstoreNew]
.writeStream
.option("checkpointLocation", "/tmp/t3/")
.trigger(Trigger.Once())
.foreach(new ForeachWriter[CstoreNew] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(value: CstoreNew): Unit = {
val spark = SparkSessionSingleton.getInstance(sparkConf)
val valueDF = Seq(value).toDF() // GET WRONG
kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
}
override def close(errorOrNull: Throwable): Unit = {
}
})
val query = console.start()
query.awaitTermination()
when run to val valueDF = Seq(value).toDF() I got error msg :
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...
and SQLImplicits.scala:228 is :
227: implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
228: DatasetHolder(_sqlContext.createDataset(s))
229: }
can anyone give me some help?
2019-11-25
lk_spark