You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nirmalya Sengupta <se...@gmail.com> on 2015/12/14 02:55:57 UTC

Behaviour of CountWindowAll

Hello Fabian (and others),

Sorry to bring up the same flogged topic of CountWindowAll() but I just
want to be sure that I understand it right.

For a dataset like the following (partial):

-----------------------------------------

probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
probe-dccefede,199,749.25,78.6057,1448028160,27.46
probe-f29f9662,199,821.81,81.7831,1448028160,22.35
probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
probe-4d78b545,204,778.42,78.412,1448028160,25.92
probe-400c5cdf,204,711.65,73.585,1448028160,27.18
...........
-----------------------------------------

The following code :

-----------------------------------------
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)

    val readings =
      readIncomingReadings(env,"./sampleIOTTiny.csv")
      .map(e => (e.sensorUUID,e.ambientTemperature))
      .countWindowAll(4,1)
      .maxBy(1)


    readings.print
-------------------------------------------

produces this (partial):

------------------------------------------
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-400c5cdf,27.18)
......
------------------------------------------

I am trying to justify the first three lines of the output. When I call
CountWindowAll(4,1), don't I instruct Flink that '*wait till you get at
least first 4 readings before you calculate the maximum*'? It appears that
Flink is calculating max() for every incoming tuple it is adding to the
window. What is the correct and complete interpretation of the computation
then?

-- N

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Re: Behaviour of CountWindowAll

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Nirmalya,
when using count windows the window will trigger after “slide-size” elements have been received. So, since in your example, slide-size is set to 1 it will emit a new max for every element received and once it accumulated 4 elements it will start removing one element for every new element that arrives before computing the max.

Cheers,
Aljoscha
> On 14 Dec 2015, at 02:55, Nirmalya Sengupta <se...@gmail.com> wrote:
> 
> Hello Fabian (and others),
> 
> Sorry to bring up the same flogged topic of CountWindowAll() but I just want to be sure that I understand it right.
> 
> For a dataset like the following (partial):
> 
> -----------------------------------------
> 
> probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
> probe-dccefede,199,749.25,78.6057,1448028160,27.46
> probe-f29f9662,199,821.81,81.7831,1448028160,22.35
> probe-5dac1d9f,195,870.71,83.1028,1448028160,15.98
> probe-6c75cfbe,198,830.06,82.5607,1448028160,30.02
> probe-4d78b545,204,778.42,78.412,1448028160,25.92
> probe-400c5cdf,204,711.65,73.585,1448028160,27.18
> ...........
> -----------------------------------------
> 
> The following code :
> 
> -----------------------------------------
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)
>     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>     env.setParallelism(1)
> 
>     val readings =
>       readIncomingReadings(env,"./sampleIOTTiny.csv")
>       .map(e => (e.sensorUUID,e.ambientTemperature))
>       .countWindowAll(4,1)
>       .maxBy(1)
> 
> 
>     readings.print
> -------------------------------------------
> 
> produces this (partial):
> 
> ------------------------------------------
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-f076c2b0,29.37)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-6c75cfbe,30.02)
> (probe-400c5cdf,27.18)
> ......
> ------------------------------------------
> 
> I am trying to justify the first three lines of the output. When I call CountWindowAll(4,1), don't I instruct Flink that 'wait till you get at least first 4 readings before you calculate the maximum'? It appears that Flink is calculating max() for every incoming tuple it is adding to the window. What is the correct and complete interpretation of the computation then?
> 
> -- N
> 
> -- 
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is where they should be.
> Now put the foundation under them."