You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Chris Regnier <c....@oculusinfo.com> on 2014/01/11 00:05:42 UTC
Windows of windowed streams not displaying the expected results
Hey everyone,
I have the following example that creates a windowed dstream, creates
another window based on the first, and then prints out the results. I
can run the test back to back and in some cases I get the expected
results, but then in other cases I get no output in the 2nd level streams.
Is there a specific way to structure the windowed streams that always
gives the expected results? Or what would cause the 2nd level streams to
be completely empty in some runs but not others? Right now it seems like
some kind of timing bug maybe?
package com.chris
import org.apache.spark.SparkContext
import org.apache.spark.streaming.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.collection.mutable.SynchronizedQueue
import org.apache.spark.rdd.RDD
import org.scalatest.FunSuite
class StreamingTestSuite extends FunSuite {
def mkSubStream (name: String, root: DStream[Int], windowDuration:
Int, slideInterval: Int): DStream[Int] = {
val w = root.window(Seconds(windowDuration), Seconds(slideInterval))
w.foreach(rdd => {
println("New "+name)
rdd.foreach(k => println("\t"+name+"\t"+k))
})
w
}
test("Test streaming") {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark.storage.BlockManager").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
val sc = new StreamingContext("local", "Watcher", Seconds(1), null,
null, null)
val rddQueue = new SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
val inputStream = sc.queueStream(rddQueue)
val w5Stream = mkSubStream("base", inputStream, 5, 5)
//create 2 new windowed streams based on the other, and print out
the results
//NOTE: sometimes the correct values are printed, other times
nothing is printed... don't know why?
mkSubStream("5+05", w5Stream, 5, 10)
mkSubStream("10+05", w5Stream, 10, 10)
sc.start()
for (i <- 1 to 300) {
rddQueue += sc.sparkContext.makeRDD(i to i, 1)
}
readLine()
sc.stop()
}
}
Thanks,
Chris Regnier
-------------------------
Visualization Developer
Oculus Info Inc.
~
Re: Windows of windowed streams not displaying the expected results
Posted by DMiner <ms...@outlook.com>.
Yes, I also met this issue. And wanna check if you fixed this issue or do you
have other solution for the same goal.
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Windows-of-windowed-streams-not-displaying-the-expected-results-tp466p23096.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org