You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 萝卜丝炒饭 <14...@qq.com> on 2017/06/27 15:08:20 UTC

the function of countByValueAndWindow and foreachRDD in DStream, would you like help me understand it please?

HI all,


I have code like below:
    Logger.getLogger("org.apache.spark").setLevel( Level.ERROR)
//    Logger.getLogger("org.apache.spark.streaming.dstream").setLevel( Level.DEBUG)
    val conf = new SparkConf().setAppName("testDstream").setMaster("local[4]")
//    val sc = SparkContext.getOrCreate( conf)
    val ssc = new StreamingContext(conf, Seconds(1))

    ssc.checkpoint( "E:\\spark\\tmp\\cp")
    val lines = ssc.socketTextStream("127.0.0.1", 9999)
    lines.foreachRDD( r=>{
      println("RDD" + r.id + "begin" + "   " + new SimpleDateFormat("yyyy-mm-dd  HH:MM:SS").format( new Date()))
      r.foreach( ele => println(":::" + ele))
      println("RDD" + r.id + "end")
    })
    lines.countByValueAndWindow( Seconds(4), Seconds(1)).foreachRDD( s => {      // here is key code 
      println( "countByValueAndWindow RDD ID IS : " + s.id + "begin")
      println("time is " + new SimpleDateFormat("yyyy-mm-dd  HH:MM:SS").format( new Date()))
      s.foreach( e => println("data is " + e._1 + " :" + e._2))
      println("countByValueAndWindow RDD ID IS : " + s.id + "end")
    })

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
I run the code and use "nc" send the message manually. The speed I input message is about one letter per seconds.I know the time in log does not equal the window duration, but I think they are very near.the output and my comment is :-----------------------------------------------------------RDD1begin   2017-41-27  22:06:16 RDD1end countByValueAndWindow RDD ID IS : 7  begin time is 2017-41-27  22:06:16 countByValueAndWindow RDD ID IS : 7  end RDD8begin   2017-41-27  22:06:17 RDD8end countByValueAndWindow RDD ID IS : 13  begin time is 2017-41-27  22:06:17 countByValueAndWindow RDD ID IS : 13  end RDD14begin   2017-41-27  22:06:18 :::1 RDD14end countByValueAndWindow RDD ID IS : 19  begin   time is 2017-41-27  22:06:18  <== data from 22:06:15 -- 22:06:18 is in RDD 14. data is 1 :1 countByValueAndWindow RDD ID IS : 19  end RDD20begin   2017-41-27  22:06:19 :::2 RDD20end countByValueAndWindow RDD ID IS : 25  begin  time is 2017-41-27  22:06:19  <== data from 22:06:16 -- 22:06:19 is in RDD 14 ,20. data is 1 :1 data is 2 :1 countByValueAndWindow RDD ID IS : 25  end RDD26begin   2017-41-27  22:06:20 :::3 RDD26end countByValueAndWindow RDD ID IS : 31  begin time is 2017-41-27  22:06:20 <== data from 22:06:17 -- 22:06:20 is in RDD 14 , 20, 26 data is 2 :1 data is 1 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 31  end RDD32begin   2017-41-27  22:06:21 :::4 RDD32end countByValueAndWindow RDD ID IS : 37  begin time is 2017-41-27  22:06:21 <== data from 22:06:18 -- 22:06:21 is in RDD 14 , 20,  26, 32 data is 2 :1 data is 1 :1 data is 4 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 37  end RDD38begin   2017-41-27  22:06:22:::5:::6 RDD38end countByValueAndWindow RDD ID IS : 43  begin time is 2017-41-27  22:06:22<== data from 22:06:19 -- 22:06:22 is in RDD  20,  26, 32,38. Here 14 is out of window. data is 4 :1 data is 5 :1 data is 6 :1 data is 2 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 43  end RDD44begin   2017-41-27  22:06:23 :::7 RDD44end countByValueAndWindow RDD ID IS : 49  begin time is 2017-41-27  22:06:23  <== data from 22:06:29 -- 22:06:23 is in RDD    26, 32,38, 44. Here 20is out of window. data is 5 :1 data is 4 :1 data is 6 :1 data is 7 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 49  end-----------------------------------------------------------I think the foreachRDD function outputs the last RDD calculated by countByValueAndWindow, and the above log validate my idea.Now, I change the red code tolines.countByValueAndWindow( Seconds(4), Seconds(6)).foreachRDD( s => {      // here is key code the slide duration is 6 seconds. The log and my comment is below:-----------------------------------------------------------DD1begin   2017-59-27  10:59:12 RDD1end RDD2begin   2017-59-27  10:59:13 :::1 :::2 RDD2end RDD3begin   2017-59-27  10:59:14 :::3 RDD3end RDD4begin   2017-59-27  10:59:15 :::4 RDD4end RDD5begin   2017-59-27  10:59:16 :::5 RDD5end RDD6begin   2017-59-27  10:59:17 RDD6end countByValueAndWindow RDD ID IS : 22  begin time is 2017-59-27  10:59:17 <== I think here is OK, event RDD2 is calculated. data is 4 :1 data is 5 :1 data is 1 :1 data is 2 :1 data is 3 :1 countByValueAndWindow RDD ID IS : 22  end RDD23begin   2017-59-27  10:59:18 :::6 RDD23end RDD24begin   2017-59-27  10:59:19 :::8 :::7 RDD24end RDD25begin   2017-59-27  10:59:20 :::9 RDD25end RDD26begin   2017-59-27  10:59:21 :::0 RDD26end RDD27begin   2017-59-27  10:59:22 :::- RDD27end RDD28begin   2017-59-27  10:59:23 :::p RDD28end countByValueAndWindow RDD ID IS : 43  begin time is 2017-59-27  10:59:23 <==the data between 10:59:20 --10:59:23 should be RDD 25, 26, 27, 28. but the data is wrong.  data is 6 :1 data is 2 :1 data is 9 :1 data is - :1 data is 1 :1 data is 8 :1 data is p :1 data is 0 :1 data is 7 :1 countByValueAndWindow RDD ID IS : 43  end RDD44begin   2017-59-27  10:59:24 :::o RDD44end RDD46begin   2017-59-27  10:59:25 :::i RDD46end RDD47begin   2017-59-27  10:59:26 :::u RDD47end RDD48begin   2017-59-27  10:59:27 :::y RDD48end RDD49begin   2017-59-27  10:59:28 :::t RDD49end RDD50begin   2017-59-27  10:59:29 :::r RDD50end countByValueAndWindow RDD ID IS : 65  begin time is 2017-59-27  10:59:29<==here  is wrong too.  data is 6 :1 data is 2 :1 data is r :1 data is 8 :1 data is t :1 data is i :1 data is y :1 data is u :1 data is 1 :1 data is 7 :1 data is o :1 countByValueAndWindow RDD ID IS : 65  end-----------------------------------------------------------
Would you like tell me why the log of second time is not same with my understanding please?This issue besets me several days.ThanksFei Shao