You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lu...@sina.com on 2016/09/05 11:00:31 UTC

回复:[SparkSQL+SparkStreaming]SparkStreaming APP can not load data into SparkSQL table

the data can be written as parquet into HDFS. But the loading data process is not working as expected.

--------------------------------

 

Thanks&Best regards!
San.Luo

----- 原始邮件 -----
发件人:<lu...@sina.com>
收件人:"user" <us...@spark.apache.org>
主题:[SparkSQL+SparkStreaming]SparkStreaming APP can not load data into SparkSQL table
日期:2016年09月05日 18点55分

hi guys:     I got a question that  my SparkStreaming APP can not loading data into SparkSQL table in. Here is my code:
    val conf = new SparkConf().setAppName("KafkaStreaming for " + topics).setMaster("spark://master60:7077")
    val storageLevel = StorageLevel.DISK_ONLY
    val ssc = new StreamingContext(conf, Seconds(batchInterval.toInt))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //Receiver-based 
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, storageLevel)

    kafkaStream.foreachRDD { rdd =>
      val x = rdd.count()
      println(s"================processing $x records=================")
      rdd.collect().foreach(println)
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      val logRDD = sqlContext.read.json(rdd.values).select("payload").map(_.mkString)
      val logRDD2 = logRDD.map(_.split(',')).map { x =>
        NginxLog(x(0).trim().toFloat.toInt,
          x(1).trim(),
          x(2).trim(),
          x(3).trim(),
          x(4).trim(),
          x(5).trim(),
          x(6).trim(),
          x(7).trim(),
          x(8).trim(),
          x(9).trim(),
          x(10).trim())
      }
      val recDF = logRDD2.toDF
      recDF.printSchema()

      val hc = new org.apache.spark.sql.hive.HiveContext(rdd.sparkContext)
      val index = rdd.id
      recDF.write.parquet(s"/etl/tables/nginxlog/${topicNO}/${index}")
      hc.sql("CREATE TABLE IF NOT EXISTS nginxlog(msec Int,remote_addr String,u_domain String,u_url String,u_title String,u_referrer String,u_sh String,u_sw String,u_cd String,u_lang String,u_utrace String) STORED AS PARQUET")      hc.sql(s"LOAD DATA INPATH '/etl/tables/nginxlog/${topicNO}/${index}' INTO TABLE nginxlog")    }

There isn't any exception during running my APP. however, except the data in the first batch could be loaded into table nginxlog, all other batches can not be successfully loaded.I can not understand the reason of this kind of behavior. Is that my (hc)hivecontext issue?
PS.my spark cluster version: 1.6.1




--------------------------------

 

Thanks&amp;Best regards!
San.Luo