You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lk_spark <lk...@163.com> on 2019/11/25 08:00:03 UTC

how spark structrued stream write to kudu

hi,all:
       I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below : 

    val kuduContext = new KuduContext("master:7051", spark.sparkContext)

    val console = cnew.select("*").as[CstoreNew]
      .writeStream
      .option("checkpointLocation", "/tmp/t3/")
      .trigger(Trigger.Once())
      .foreach(new ForeachWriter[CstoreNew] {
        override def open(partitionId: Long, version: Long): Boolean = {
          true
        }
        override def process(value: CstoreNew): Unit = {
          val spark = SparkSessionSingleton.getInstance(sparkConf)
          val valueDF = Seq(value).toDF()   // GET WRONG
          kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
        }
        override def close(errorOrNull: Throwable): Unit = {
        }
      })
    val query = console.start()
    query.awaitTermination()

when run to val valueDF = Seq(value).toDF() I got error msg : 
Caused by: java.lang.NullPointerException
 at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
 at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...

and  SQLImplicits.scala:228 is :

227:   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
228:        DatasetHolder(_sqlContext.createDataset(s))
229:   }

can anyone give me some help?
2019-11-25


lk_spark 

Re: how spark structrued stream write to kudu

Posted by lk_spark <lk...@163.com>.
I found _sqlContext is null , how to resolve it ?

2019-11-25 

lk_spark 



发件人:"lk_spark"<lk...@163.com>
发送时间:2019-11-25 16:00
主题:how spark structrued stream write to kudu
收件人:"user.spark"<us...@spark.apache.org>
抄送:

hi,all:
       I'm using spark 2.4.4 to readstream data from kafka and want to write to kudu 1.7.0 , my code like below : 

    val kuduContext = new KuduContext("master:7051", spark.sparkContext)

    val console = cnew.select("*").as[CstoreNew]
      .writeStream
      .option("checkpointLocation", "/tmp/t3/")
      .trigger(Trigger.Once())
      .foreach(new ForeachWriter[CstoreNew] {
        override def open(partitionId: Long, version: Long): Boolean = {
          true
        }
        override def process(value: CstoreNew): Unit = {
          val spark = SparkSessionSingleton.getInstance(sparkConf)
          val valueDF = Seq(value).toDF()   // GET WRONG
          kuduContext.upsertRows(valueDF, "impala::test.cstore_bury_event_data")
        }
        override def close(errorOrNull: Throwable): Unit = {
        }
      })
    val query = console.start()
    query.awaitTermination()

when run to val valueDF = Seq(value).toDF() I got error msg : 
Caused by: java.lang.NullPointerException
 at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
 at com.gaojihealth.spark.kafkaconsumer.CstoreNew2KUDU$$anon$1.process(CstoreNew2KUDU.scala:122)
...

and  SQLImplicits.scala:228 is :

227:   implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
228:        DatasetHolder(_sqlContext.createDataset(s))
229:   }

can anyone give me some help?
2019-11-25


lk_spark