You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "QingFeng Zhang (JIRA)" <ji...@apache.org> on 2014/05/12 14:28:15 UTC

[jira] [Reopened] (SPARK-1797) streaming on hdfs can detected all new file, but the sum of all the rdd.count() not equals which had detected

     [ https://issues.apache.org/jira/browse/SPARK-1797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

QingFeng Zhang reopened SPARK-1797:
-----------------------------------


> streaming on hdfs can detected all new file, but the sum of all the rdd.count() not equals which had detected
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-1797
>                 URL: https://issues.apache.org/jira/browse/SPARK-1797
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, Spark Core
>    Affects Versions: 0.9.0
>         Environment: spark0.9.0,hadoop2.3.0,1 Master,5 Slaves.
>            Reporter: QingFeng Zhang
>         Attachments: 1.png
>
>
> when I put 200 png files to Hdfs , I found sparkStreaming counld detect 200 files , but the sum of rdd.count() is less than 200, always  between 130 and 170, I don't know why...Is this a Bug?
> PS: When I put 200 files in hdfs before streaming run , It get the correct count and right result.
>   def main(args: Array[String]) {
>     val conf = new SparkConf().setMaster(SparkURL)
>       .setAppName("QimageStreaming-broadcast")
>       .setSparkHome(System.getenv("SPARK_HOME"))
>       .setJars(SparkContext.jarOfClass(this.getClass()))
>     conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>     conf.set("spark.kryo.registrator", "qing.hdu.Image.MyRegistrator")
>     conf.set("spark.kryoserializer.buffer.mb", "10"); 
>     val ssc = new StreamingContext(conf, Seconds(2))
>     val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]]
>     val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]]
>     val input_path = HdfsURL + "/Qimage/input"
>     val output_path = HdfsURL + "/Qimage/output/"
>     val bg_path = HdfsURL + "/Qimage/bg/"
>     val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage, QimageInputFormat[Text, Qimage]](bg_path)
>     val bbg = bg.map(data => (data._1.toString(), data._2))
>     val broadcastbg = ssc.sparkContext.broadcast(bbg)
>     val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text, Qimage]](input_path)
>     val qingbg = broadcastbg.value.collectAsMap
>     val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) => {
>      val rddnum = rdd.count
>       System.out.println("\n\n"+ "rddnum is " + rddnum + "\n\n")
>       if (rddnum > 0) {  
>         System.out.println("here is foreachFunc")
>        val a = rdd.keys
>         val b = a.first
>         val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage)
>         rdd.map(data => (data._1, (new QimageProc(data._1, data._2)).koutu(cbg)))
>           .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage], outputFormatClass)
>       }
>     }
>     file.foreachRDD(foreachFunc)
>     ssc.start()
>     ssc.awaitTermination()
>   }



--
This message was sent by Atlassian JIRA
(v6.2#6252)