You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Fei Shao (JIRA)" <ji...@apache.org> on 2017/06/25 10:58:00 UTC
[jira] [Comment Edited] (SPARK-21206) the window slice of Dstream
is wrong
[ https://issues.apache.org/jira/browse/SPARK-21206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062284#comment-16062284 ]
Fei Shao edited comment on SPARK-21206 at 6/25/17 10:57 AM:
------------------------------------------------------------
Hi Sean Owen,
I am sorry, I did not give enough message about this issue.
For my test code:
lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { 《=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds.
===========log begin ============
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) 《=== here, the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. Actual it should be 2 seconds.
===========log end============
===========code in ReducedWindowedDStream.scala begin============
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _____________________________
// | previous window _________|___________________
// |___________________| current window | --------------> Time
// |_____________________________|
//
// |________ _________| |________ _________|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) 《== I think this line is "reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + windowDuration - parent.slideDuration)"
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)《== this line is "reducedStream.slice(previousWindow.endTime + windowDuration - parent.slideDuration,
currentWindow.endTime)"
logDebug("# new RDDs = " + newRDDs.size)
===========code in ReducedWindowedDStream.scala end============
was (Author: robin shao):
Hi Sean Owen,
I am sorry, I did not give enough message about this issue.
For my test code:
lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { 《=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds.
===========log begin ============
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) 《=== here, the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. Actual it should be 2 seconds.
===========log end============
===========code in ReducedWindowedDStream.scala begin============
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _____________________________
// | previous window _________|___________________
// |___________________| current window | --------------> Time
// |_____________________________|
//
// |________ _________| |________ _________|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) 《== I think this line is "reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + windowDuration - parent.slideDuration)"
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)《== this line is reducedStream.slice(previousWindow.endTime + windowDuration - parent.slideDuration,
currentWindow.endTime)
logDebug("# new RDDs = " + newRDDs.size)
===========code in ReducedWindowedDStream.scala end============
> the window slice of Dstream is wrong
> ------------------------------------
>
> Key: SPARK-21206
> URL: https://issues.apache.org/jira/browse/SPARK-21206
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.1.0
> Reporter: Fei Shao
>
> the code is :
> val conf = new SparkConf().setAppName("testDstream").setMaster("local[4]")
> val ssc = new StreamingContext(conf, Seconds(1))
> ssc.checkpoint( "path")
> val lines = ssc.socketTextStream("IP", PORT)
> lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => {
> println( "RDD ID IS : " + s.id)
> s.foreach( e => println("data is " + e._1 + " :" + e._2))
> println()
> })
> The result is wrong.
> I checked the log, it showed:
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms]
> 17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms]
> 17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)
> 17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms
> 17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid
> 17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid
> the slice time is wrong.
> [BTW]: Team members,
> If it was a bug, please don't fix it.I try to fix it myself.Thanks:)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org