You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/05/11 13:21:00 UTC
[jira] [Resolved] (SPARK-7326) Performing window() on a
WindowedDStream doesn't work all the time
[ https://issues.apache.org/jira/browse/SPARK-7326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-7326.
------------------------------
Resolution: Fixed
Fix Version/s: 1.4.0
Issue resolved by pull request 5871
[https://github.com/apache/spark/pull/5871]
> Performing window() on a WindowedDStream doesn't work all the time
> ------------------------------------------------------------------
>
> Key: SPARK-7326
> URL: https://issues.apache.org/jira/browse/SPARK-7326
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.3.1
> Reporter: Wesley Miao
> Fix For: 1.4.0
>
>
> Someone reported similar issues before but got no response.
> http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-td466.html
> And I met the same issue recently and it can be reproduced in 1.3.1 by the following piece of code:
> def main(args: Array[String]) {
> val batchInterval = "1234"
> val sparkConf = new SparkConf()
> .setAppName("WindowOnWindowedDStream")
> .setMaster("local[2]")
> val ssc = new StreamingContext(sparkConf, Milliseconds(batchInterval.toInt))
> ssc.checkpoint("checkpoint")
> def createRDD(i: Int) : RDD[(String, Int)] = {
> val count = 1000
> val rawLogs = (1 to count).map{ _ =>
> val word = "word" + Random.nextInt.abs % 5
> (word, 1)
> }
> ssc.sparkContext.parallelize(rawLogs)
> }
> val rddQueue = mutable.Queue[RDD[(String, Int)]]()
> val rawLogStream = ssc.queueStream(rddQueue)
> (1 to 300) foreach { i =>
> rddQueue.enqueue(createRDD(i))
> }
> val l1 = rawLogStream.window(Milliseconds(batchInterval.toInt) * 5, Milliseconds(batchInterval.toInt) * 5).reduceByKey(_ + _)
> val l2 = l1.window(Milliseconds(batchInterval.toInt) * 15, Milliseconds(batchInterval.toInt) * 15).reduceByKey(_ + _)
> l1.print()
> l2.print()
> ssc.start()
> ssc.awaitTermination()
> }
> Here we have two windowed DStream instance l1 and l2.
> l1 is the result DStream by performing a window() on the source DStream with both window and sliding duration 5 times the batch internal of the source stream.
> l2 is the result DStream by performing a window() on l1, with both window and sliding duration 3 times l1's batch interval, which is 15 times of the source stream.
> From the output of this simple streaming app, I can only see print data output from l1 and no data printed from l2.
> Diving into the source code, I found the problem may most likely reside in DStream.slice() implementation, as shown below.
> def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
> if (!isInitialized) {
> throw new SparkException(this + " has not been initialized")
> }
> if (!(fromTime - zeroTime).isMultipleOf(slideDuration)) {
> logWarning("fromTime (" + fromTime + ") is not a multiple of slideDuration ("
> + slideDuration + ")")
> }
> if (!(toTime - zeroTime).isMultipleOf(slideDuration)) {
> logWarning("toTime (" + fromTime + ") is not a multiple of slideDuration ("
> + slideDuration + ")")
> }
> val alignedToTime = toTime.floor(slideDuration, zeroTime)
> val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
> logInfo("Slicing from " + fromTime + " to " + toTime +
> " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")
> alignedFromTime.to(alignedToTime, slideDuration).flatMap(time => {
> if (time >= zeroTime) getOrCompute(time) else None
> })
> }
> Here after performing floor() on both fromTime and toTime, the result (alignedFromTime - zeroTime) and (alignedToTime - zeroTime) may no longer be multiple of the slidingDuration, thus making isTimeValid() check failed for all the remaining computation.
> The fix would be to add a new floor() function in Time.scala to respect the zeroTime while performing the floor :
> def floor(that: Duration, zeroTime: Time): Time = {
> val t = that.milliseconds
> new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
> }
> And then change the DStream.slice to call this new floor function by passing in its zeroTime.
> val alignedToTime = toTime.floor(slideDuration, zeroTime)
> val alignedFromTime = fromTime.floor(slideDuration, zeroTime)
> This way the alignedToTime and alignedFromTime are *really* aligned in respect to zeroTime whose value is not really a 0.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org