You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Raj Kumar <sm...@gmail.com> on 2017/07/28 16:31:01 UTC

Access Sliding window

Hi,

I am using a sliding window to monitor server performance. I need to keep
track of number of HTTP requests generated and alert the user when the
requests gets too high(Sliding window of 6 hours which slides every 15mins). 
Aggregate count of the number of http requests is evaluated in the 15mins
sliding window. I need to keep track of running average of these aggregate
count over the different sliding window of 15mins to create an alert when
the load is over the average+1std deviation.

How can we achieve this ? How can we keep track of running average for the
all the sliding windows ?
P



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Access Sliding window

Posted by Raj Kumar <sm...@gmail.com>.
Thanks Fabian. I do have one more question.
When we connect the two streams and perfrom coprocess function. There are
two separate methods for each streams. Which stream state we need to store
and Will the coprocess function automatically trigger once the other stream
data or should we set some timer to trigger.

CoProcessFunction<ProcessedLogData, String, String>() {
              public void processElement1(ProcessedLogData processedLogData,
Context context, Collector<String> collector) throws Exception {
                                              
                                          }

                                          @Override
        public void processElement2(String s, Context context,
Collector<String> collector) throws Exception {
                                 
                                          }
                                      })



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519p14700.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Access Sliding window

Posted by Raj Kumar <sm...@gmail.com>.
Hi Fabian,

Can you please answer my last set of questions I have posted on the Forum.

Thanks.

On Friday, August 4, 2017, Fabian Hueske-2 [via Apache Flink User Mailing
List archive.] <ml...@n4.nabble.com> wrote:

> TimeWindow.getStart() or TimeWindow.getEnd()
>
> -> https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/windows.html#incremental-window-aggregation-with-
> reducefunction
>
> 2017-08-04 22:43 GMT+02:00 Raj Kumar <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=14699&i=0>>:
>
>> Thanks Fabian.
>>
>> The incoming events have the timestamps. Once I aggregate in the first
>> stream to get counts and calculate the mean/standard deviation in the
>> second
>> the new timestamps should be window start time ? How to tackle this issue
>> ?
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Access-Sliding-wind
>> ow-tp14519p14698.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Access-Sliding-window-tp14519p14699.html
> To unsubscribe from Access Sliding window, click here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=14519&code=c21hbGx0aGluZ3MxOTkyQGdtYWlsLmNvbXwxNDUxOXwxMzg2ODIwODYy>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519p14718.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Access Sliding window

Posted by Fabian Hueske <fh...@gmail.com>.
TimeWindow.getStart() or TimeWindow.getEnd()

->
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#incremental-window-aggregation-with-reducefunction

2017-08-04 22:43 GMT+02:00 Raj Kumar <sm...@gmail.com>:

> Thanks Fabian.
>
> The incoming events have the timestamps. Once I aggregate in the first
> stream to get counts and calculate the mean/standard deviation in the
> second
> the new timestamps should be window start time ? How to tackle this issue ?
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-
> window-tp14519p14698.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Access Sliding window

Posted by Raj Kumar <sm...@gmail.com>.
Thanks Fabian. 

The incoming events have the timestamps. Once I aggregate in the first
stream to get counts and calculate the mean/standard deviation in the second
the new timestamps should be window start time ? How to tackle this issue ?




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519p14698.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Access Sliding window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Raj,

you have to combine two streams. The first stream has the running avg +
std-dev over the last 6 hours, the second stream has the 15 minute counts.
Both streams emit one record every 15 minutes. What you wan to do is to
join the two records of both streams with the same timestamp.
You do that by connecting the streams and implementing a CoProcessFunction.
btw. CoProcessFunction implements RichFunction.
The function must be stateful, because you need to collect the first record
that is received from either input and wait for the record from the other
input in order to be able to join them, i.e. compare the count against the
avg + std-dev. So which ever record you receive first, you put into state
and wait for the other record with the same timestamp to arrive. After the
join, you clear the state.

Hope that helps,
Fabian

2017-08-04 7:49 GMT+02:00 Raj Kumar <sm...@gmail.com>:

> Thanks Fabian. Your suggestion helped. But, I am stuck at 3rd step
>
> 1. I didn't completely understand the step 3. What the process function
> should look like ? Why does it needs to be stateful. Can you please provide
> more details on this.
> 2. In the stateful, function, we need to have a value state ? what details
> we need to store would be helpful to implement the use case.
> 3. Moreover, I see that RichProcessFunction is deprecated. What else can we
> use in place of RichProcessFunction ?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-
> window-tp14519p14675.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Access Sliding window

Posted by Raj Kumar <sm...@gmail.com>.
Thanks Fabian. Your suggestion helped. But, I am stuck at 3rd step

1. I didn't completely understand the step 3. What the process function
should look like ? Why does it needs to be stateful. Can you please provide
more details on this. 
2. In the stateful, function, we need to have a value state ? what details
we need to store would be helpful to implement the use case.
3. Moreover, I see that RichProcessFunction is deprecated. What else can we
use in place of RichProcessFunction ?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519p14675.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Access Sliding window

Posted by Fabian Hueske <fh...@gmail.com>.
The average would be computed over the aggregated 15-minute count values.
The sliding window would emit every 15 minutes the average of all records
that arrived within the last 6 hours.
Since the preceding 15-minute tumbling window emits 1 record every 15 mins,
this would be the avg over 24 records.

So, it would be running with a "granularity" of 15 minutes.

Best, Fabian

2017-08-01 4:48 GMT+02:00 Raj Kumar <sm...@gmail.com>:

> Thanks Fabian. That helps.
>
> I have one more question. In the second step since I am using window
> function apply, The average calculated will be a running average or it will
> be computed at the end of 6hrs window ??
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-
> window-tp14519p14584.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Access Sliding window

Posted by Raj Kumar <sm...@gmail.com>.
Thanks Fabian. That helps.

I have one more question. In the second step since I am using window
function apply, The average calculated will be a running average or it will
be computed at the end of 6hrs window ??



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519p14584.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Access Sliding window

Posted by Fabian Hueske <fh...@gmail.com>.
You can compute the average and std-dev in a WindowFunction that iterates
over all records in the window (6h / 15min = 24).
WIndowFunction [1] and CoProcessFunction [2] are described in the docs.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/windows.html#windowfunction---the-generic-case
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/process_function.html#low-level-joins

2017-07-31 19:23 GMT+02:00 Raj Kumar <sm...@gmail.com>:

> Thanks Fabian.
>
> Can you provide more details about the implementation for step 2 and step
> 3.
>
> How to calculate the average and standard deviation ?
> How does the coprocess function work ? Can you provide details about these
> two.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-
> window-tp14519p14564.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Access Sliding window

Posted by Raj Kumar <sm...@gmail.com>.
Thanks Fabian.

Can you provide more details about the implementation for step 2 and step 3.

How to calculate the average and standard deviation ? 
How does the coprocess function work ? Can you provide details about these
two.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-window-tp14519p14564.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Access Sliding window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I would first compute the 15 minute counts. Based on these counts, you
compute the threshold by computing average and std-dev and then you compare
the counts with the threshold.
In pseudo code this could look as follows:

DataStream requests = ...
DataStream counts = requests.timeWindow(15 mins).reduce(// count requests)
DataStream thresholds = counts.timeWindow(15 mins, 6 hours).apply(//
compute every 15 mins average and std-dev over 6 hours)
DataStream alerts = counts.connect(thresholds).process(// stateful
CoProcessFunction that compares thresholds with counts)

Hope this helps,
Fabian

2017-07-28 18:31 GMT+02:00 Raj Kumar <sm...@gmail.com>:

> Hi,
>
> I am using a sliding window to monitor server performance. I need to keep
> track of number of HTTP requests generated and alert the user when the
> requests gets too high(Sliding window of 6 hours which slides every
> 15mins).
> Aggregate count of the number of http requests is evaluated in the 15mins
> sliding window. I need to keep track of running average of these aggregate
> count over the different sliding window of 15mins to create an alert when
> the load is over the average+1std deviation.
>
> How can we achieve this ? How can we keep track of running average for the
> all the sliding windows ?
> P
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Access-Sliding-
> window-tp14519.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>