You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/05/11 13:21:00 UTC

[jira] [Resolved] (SPARK-7326) Performing window() on a WindowedDStream doesn't work all the time

     [ https://issues.apache.org/jira/browse/SPARK-7326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen resolved SPARK-7326.
------------------------------
       Resolution: Fixed
    Fix Version/s: 1.4.0

Issue resolved by pull request 5871
[https://github.com/apache/spark/pull/5871]

> Performing window() on a WindowedDStream doesn't work all the time
> ------------------------------------------------------------------
>
>                 Key: SPARK-7326
>                 URL: https://issues.apache.org/jira/browse/SPARK-7326
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.1
>            Reporter: Wesley Miao
>             Fix For: 1.4.0
>
>
> Someone reported similar issues before but got no response.
> http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-td466.html
> And I met the same issue recently and it can be reproduced in 1.3.1 by the following piece of code:
> def main(args: Array[String]) {
>     val batchInterval = "1234"
>     val sparkConf = new SparkConf()
>       .setAppName("WindowOnWindowedDStream")
>       .setMaster("local[2]")
>     val ssc =  new StreamingContext(sparkConf, Milliseconds(batchInterval.toInt))
>     ssc.checkpoint("checkpoint")
>     def createRDD(i: Int) : RDD[(String, Int)] = {
>       val count = 1000
>       val rawLogs = (1 to count).map{ _ =>
>         val word = "word" + Random.nextInt.abs % 5
>         (word, 1)
>       }
>       ssc.sparkContext.parallelize(rawLogs)
>     }
>     val rddQueue = mutable.Queue[RDD[(String, Int)]]()
>     val rawLogStream = ssc.queueStream(rddQueue)
>     (1 to 300) foreach { i =>
>       rddQueue.enqueue(createRDD(i))
>     }
>     val l1 = rawLogStream.window(Milliseconds(batchInterval.toInt) * 5, Milliseconds(batchInterval.toInt) * 5).reduceByKey(_ + _)
>     val l2 = l1.window(Milliseconds(batchInterval.toInt) * 15, Milliseconds(batchInterval.toInt) * 15).reduceByKey(_ + _)
>     l1.print()
>     l2.print()
>     ssc.start()
>     ssc.awaitTermination()
> }
> Here we have two windowed DStream instance l1 and l2. 
> l1 is the result DStream by performing a window() on the source DStream with both window and sliding duration 5 times the batch internal of the source stream.
> l2 is the result DStream by performing a window() on l1, with both window and sliding duration 3 times l1's batch interval, which is 15 times of the source stream.
> From the output of this simple streaming app, I can only see print data output from l1 and no data printed from l2.
> Diving into the source code, I found the problem may most likely reside in DStream.slice() implementation, as shown below.
>   def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
>     if (!isInitialized) {
>       throw new SparkException(this + " has not been initialized")
>     }
>     if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
>       logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
>         + slideDuration + ")")
>     }
>     if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
>       logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
>         + slideDuration + ")")
>     }
>     val alignedToTime = toTime.floor(slideDuration, zeroTime)
>     val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
>     logInfo("Slicing from " + fromTime + " to " + toTime +
>       " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
>     alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
>       if (time >= zeroTime) getOrCompute(time) else None
>     })
>   }
> Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.
> The fix would be to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :
>   def floor(that: Duration, zeroTime: Time): Time = {
>     val t = that.milliseconds
>     new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
>   } 
> And then change the DStream.slice to call this new floor function by passing in its zeroTime.
>     val alignedToTime = toTime.floor(slideDuration, zeroTime)
>     val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
> This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org