You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sanjay Awatramani <sa...@yahoo.com> on 2014/03/21 15:33:53 UTC

Sliding Window operations do not work as documented

Hi,

I want to run a map/reduce process over last 5 seconds of data, every 4 seconds. This is quite similar to the sliding window pictorial example under Window Operations section on http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html . 

The RDDs returned by window transformation function are incorrect in my case. To investigate this further, I ran a series of examples with varying values of window length & slide interval. Summary of the test results:
(window length, slide interval) -> result
(3,1) -> success
(4,2) -> success
(3,2) -> fail
(4,3) -> fail
(5,4) -> fail
(5,2) -> fail

The only condition mentioned in the doc is that the two values(5 & 4) should be multiples of batch interval(1 in my case) and obviously, I get a run time error if I attempt to violate this condition. Looking at my results, it seems that failures result when the slide interval isn't a multiple of window length.

My code:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000));
JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
JavaDStream<String> objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");

Detailed results:
(3,1) -> success
@t_0: [inputStream's RDD@t_0]
@t_1: [inputStream's RDD@t_0,1]
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: [inputStream's RDD@t_1,2,3]
@t_4: [inputStream's RDD@t_2,3,4]
@t_5: [inputStream's RDD@t_3,4,5]

(4,2) -> success
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]

(3,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_2,3]    //(expected RDD@t_1,2,3)
@t_4: nothing
@t_5: [inputStream's RDD@t_4,5]    //(expected RDD@t_3,4,5)

(4,3) -> fail
@t_0: nothing
@t_1: nothing
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: nothing
@t_4: nothing
@t_5: [inputStream's RDD@t_3,4,5]    //(expected RDD@t_2,3,4,5)

(5,4) -> fail
@t_0: nothing
@t_1: nothing
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: nothing
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

(5,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]    //(expected RDD@t_1,2,3,4,5)
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

I have run all the above examples twice to be sure !
I believe either my understanding of sliding window mechanism is incorrect or there is a problem in the sliding window mechanism.

Regards,
Sanjay

Re: Sliding Window operations do not work as documented

Posted by Tathagata Das <ta...@gmail.com>.
Hello Sanjay,

Yes, your understanding of lazy semantics is correct. But ideally
every batch should read based on the batch interval provided in the
StreamingContext. Can you open a JIRA on this?

On Mon, Mar 24, 2014 at 7:45 AM, Sanjay Awatramani
<sa...@yahoo.com> wrote:
> Hi All,
>
> I found out why this problem exists. Consider the following scenario:
> - a DStream is created from any source. (I've checked with file and socket)
> - No actions are applied to this DStream
> - Sliding Window operation is applied to this DStream and an action is
> applied to the sliding window.
> In this case, Spark will not even read the input stream in the batch in
> which the sliding interval isn't a multiple of batch interval. Put another
> way, it won't read the input when it doesn't have to apply the window
> function. This is happening because all transformations in Spark are lazy.
>
> How to fix this or workaround it (see line#3):
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
> Duration(1 * 60 * 1000));
> JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
> inputStream.print(); // This is the workaround
> JavaDStream<String> objWindow = inputStream.window(new
> Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
> objWindow.dstream().saveAsTextFiles("/Output", "");
>
>
> The "Window operations" example on the streaming guide implies that Spark
> will read the stream in every batch, which is not happening because of the
> lazy transformations.
> Wherever sliding window would be used, in most of the cases, no actions will
> be taken on the pre-window batch, hence my gut feeling was that Streaming
> would read every batch if any actions are being taken in the windowed
> stream.
>
> Regards,
> Sanjay
>
>
> On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani <sa...@yahoo.com>
> wrote:
> Hi,
>
> I want to run a map/reduce process over last 5 seconds of data, every 4
> seconds. This is quite similar to the sliding window pictorial example under
> Window Operations section on
> http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html
> .
>
> The RDDs returned by window transformation function are incorrect in my
> case. To investigate this further, I ran a series of examples with varying
> values of window length & slide interval. Summary of the test results:
> (window length, slide interval) -> result
> (3,1) -> success
> (4,2) -> success
> (3,2) -> fail
> (4,3) -> fail
> (5,4) -> fail
> (5,2) -> fail
>
> The only condition mentioned in the doc is that the two values(5 & 4) should
> be multiples of batch interval(1 in my case) and obviously, I get a run time
> error if I attempt to violate this condition. Looking at my results, it
> seems that failures result when the slide interval isn't a multiple of
> window length.
>
> My code:
> JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
> Duration(1 * 60 * 1000));
> JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
> JavaDStream<String> objWindow = inputStream.window(new
> Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
> objWindow.dstream().saveAsTextFiles("/Output", "");
>
> Detailed results:
> (3,1) -> success
> @t_0: [inputStream's RDD@t_0]
> @t_1: [inputStream's RDD@t_0,1]
> @t_2: [inputStream's RDD@t_0,1,2]
> @t_3: [inputStream's RDD@t_1,2,3]
> @t_4: [inputStream's RDD@t_2,3,4]
> @t_5: [inputStream's RDD@t_3,4,5]
>
> (4,2) -> success
> @t_0: nothing
> @t_1: [inputStream's RDD@t_0,1]
> @t_2: nothing
> @t_3: [inputStream's RDD@t_0,1,2,3]
> @t_4: nothing
> @t_5: [inputStream's RDD@t_2,3,4,5]
>
> (3,2) -> fail
> @t_0: nothing
> @t_1: [inputStream's RDD@t_0,1]
> @t_2: nothing
> @t_3: [inputStream's RDD@t_2,3]    //(expected RDD@t_1,2,3)
> @t_4: nothing
> @t_5: [inputStream's RDD@t_4,5]    //(expected RDD@t_3,4,5)
>
> (4,3) -> fail
> @t_0: nothing
> @t_1: nothing
> @t_2: [inputStream's RDD@t_0,1,2]
> @t_3: nothing
> @t_4: nothing
> @t_5: [inputStream's RDD@t_3,4,5]    //(expected RDD@t_2,3,4,5)
>
> (5,4) -> fail
> @t_0: nothing
> @t_1: nothing
> @t_2: nothing
> @t_3: [inputStream's RDD@t_0,1,2,3]
> @t_4: nothing
> @t_5: nothing
> @t_6: nothing
> @t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)
>
> (5,2) -> fail
> @t_0: nothing
> @t_1: [inputStream's RDD@t_0,1]
> @t_2: nothing
> @t_3: [inputStream's RDD@t_0,1,2,3]
> @t_4: nothing
> @t_5: [inputStream's RDD@t_2,3,4,5]    //(expected RDD@t_1,2,3,4,5)
> @t_6: nothing
> @t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)
>
> I have run all the above examples twice to be sure !
> I believe either my understanding of sliding window mechanism is incorrect
> or there is a problem in the sliding window mechanism.
>
> Regards,
> Sanjay
>
>

Re: Sliding Window operations do not work as documented

Posted by Sanjay Awatramani <sa...@yahoo.com>.
Hi All,

I found out why this problem exists. Consider the following scenario:
- a DStream is created from any source. (I've checked with file and socket)
- No actions are applied to this DStream
- Sliding Window operation is applied to this DStream and an action is applied to the sliding window.
In this case, Spark will not even read the input stream in the batch in which the sliding interval isn't a multiple of batch interval. Put another way, it won't read the input when it doesn't have to apply the window function. This is happening because all transformations in Spark are lazy.

How to fix this or workaround it (see line#3):
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000));
JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
inputStream.print(); // This is the workaround
JavaDStream<String> objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");


The "Window operations" example on the streaming guide implies that Spark will read the stream in every batch, which is not happening because of the lazy transformations.
Wherever sliding window would be used, in most of the cases, no actions will be taken on the pre-window batch, hence my gut feeling was that Streaming would read every batch if any actions are being taken in the windowed stream.

Regards,
Sanjay



On Friday, 21 March 2014 8:06 PM, Sanjay Awatramani <sa...@yahoo.com> wrote:
 
Hi,

I want to run a map/reduce process over last 5 seconds of data, every 4 seconds. This is quite similar to the sliding window pictorial example under Window Operations section on http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html . 

The RDDs returned by window transformation function are incorrect in my case. To investigate this further, I ran a series of examples with varying values of window length & slide interval. Summary of the test results:
(window length, slide interval) -> result
(3,1) -> success
(4,2) -> success
(3,2) -> fail
(4,3) -> fail
(5,4) -> fail
(5,2) -> fail

The only condition mentioned in the doc is that the two values(5 & 4) should be multiples of batch interval(1 in my case) and obviously, I get a run time error if I attempt to violate this condition. Looking at my results, it seems that failures result when the slide interval isn't a multiple of window length.

My code:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(1 * 60 * 1000));
JavaDStream<String> inputStream = stcObj.textFileStream("/Input");
JavaDStream<String> objWindow = inputStream.window(new Duration(windowLen*60*1000), new Duration(slideInt*60*1000));
objWindow.dstream().saveAsTextFiles("/Output", "");

Detailed results:
(3,1) -> success
@t_0: [inputStream's RDD@t_0]
@t_1: [inputStream's RDD@t_0,1]
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: [inputStream's RDD@t_1,2,3]
@t_4: [inputStream's RDD@t_2,3,4]
@t_5: [inputStream's RDD@t_3,4,5]

(4,2) -> success
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]

(3,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_2,3]    //(expected RDD@t_1,2,3)
@t_4: nothing
@t_5: [inputStream's RDD@t_4,5]    //(expected RDD@t_3,4,5)

(4,3) -> fail
@t_0: nothing
@t_1: nothing
@t_2: [inputStream's RDD@t_0,1,2]
@t_3: nothing
@t_4: nothing
@t_5: [inputStream's RDD@t_3,4,5]    //(expected RDD@t_2,3,4,5)

(5,4) -> fail
@t_0: nothing
@t_1: nothing
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: nothing
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

(5,2) -> fail
@t_0: nothing
@t_1: [inputStream's RDD@t_0,1]
@t_2: nothing
@t_3: [inputStream's RDD@t_0,1,2,3]
@t_4: nothing
@t_5: [inputStream's RDD@t_2,3,4,5]    //(expected RDD@t_1,2,3,4,5)
@t_6: nothing
@t_7: [inputStream's RDD@t_4,5,6,7]    //(expected RDD@t_3,4,5,6,7)

I have run all the above examples twice to be sure !
I believe either my understanding of sliding window mechanism is incorrect or there is a problem in the sliding window mechanism.

Regards,
Sanjay