You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Chris Regnier <c....@oculusinfo.com> on 2014/01/11 00:05:42 UTC

Windows of windowed streams not displaying the expected results

Hey everyone,

I have the following example that creates a windowed dstream, creates 
another window based on the first, and then prints out the results. I 
can run the test back to back and in some cases I get the expected 
results, but then in other cases I get no output in the 2nd level streams.

Is there a specific way to structure the windowed streams that always 
gives the expected results? Or what would cause the 2nd level streams to 
be completely empty in some runs but not others? Right now it seems like 
some kind of timing bug maybe?


package com.chris
import org.apache.spark.SparkContext
import org.apache.spark.streaming.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.collection.mutable.SynchronizedQueue
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite

class StreamingTestSuite extends FunSuite {
   def mkSubStream (name: String, root: DStream[Int], windowDuration: 
Int, slideInterval: Int): DStream[Int] = {
     val w = root.window(Seconds(windowDuration), Seconds(slideInterval))
     w.foreach(rdd => {
       println("New "+name)
       rdd.foreach(k => println("\t"+name+"\t"+k))
     })
     w
   }

   test("Test streaming") {
     Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)

     val sc = new StreamingContext("local", "Watcher", Seconds(1), null, 
null, null)
     val rddQueue = new SynchronizedQueue[RDD[Int]]()

     // Create the QueueInputDStream and use it do some processing
     val inputStream = sc.queueStream(rddQueue)
     val w5Stream = mkSubStream("base", inputStream, 5, 5)

     //create 2 new windowed streams based on the other, and print out 
the results
     //NOTE: sometimes the correct values are printed, other times 
nothing is printed... don't know why?
     mkSubStream("5+05", w5Stream, 5, 10)
     mkSubStream("10+05", w5Stream, 10, 10)

     sc.start()
     for (i <- 1 to 300) {
       rddQueue += sc.sparkContext.makeRDD(i to i, 1)
     }
     readLine()
     sc.stop()
   }
}

Thanks,
Chris Regnier
-------------------------
Visualization Developer
Oculus Info Inc.

~

Re: Windows of windowed streams not displaying the expected results

Posted by DMiner <ms...@outlook.com>.
Yes, I also met this issue. And wanna check if you fixed this issue or do you
have other solution for the same goal.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-tp466p23096.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org