You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Raj Kumar <sm...@gmail.com> on 2017/07/24 00:47:20 UTC

Count Different Codes in a Window

Hi,

we have a requirement where we need to aggregate the data every 10mins and
write ONCE the aggregated results to the elastic search. Right now, we are
iterating over the iterable to make a count of different status codes to do
this. Is there a better way to count different status codes.

public void apply(TimeWindow timeWindow, Iterable<Tuple4&lt;String, Long,
String, String>> iterable, Collector<Tuple4&lt;String, Long, String,
String>> collector) throws Exception {

            long[] counts=new long[10];
            Arrays.fill(counts,0l);

            //count different type of records in a window
            for (Tuple4<String, Long, String, String> in : iterable) {
                counts[0]++;
                if (in.f2!=null && in.f2.startsWith("5"))
                    counts[1]++;
                else if (in.f2!=null && in.f2.startsWith("4"))
                    counts[2]++;
                else if (in.f2!=null && in.f2.startsWith("2"))
                    counts[3]++;

                if(in.f3!=null && in.f3.equalsIgnoreCase("GET"))
                    counts[4]++;
                else if(in.f3!=null && in.f3.equalsIgnoreCase("POST"))
                    counts[5]++;
                else if(in.f3!=null && in.f3.equalsIgnoreCase("PUT"))
                    counts[6]++;
                else if(in.f3!=null && in.f3.equalsIgnoreCase("HEAD"))
                    counts[7]++;

            }
...
}





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Count-Different-Codes-in-a-Window-tp14391.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Count Different Codes in a Window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Raj,

I would recommend to use a ReduceFunction instead of a WindowFunction. The
benefit of ReduceFunction is that it can be eagerly computed whenever an
element is put into the window such that the state of the window is only
one element. In contrast, the WindowFunction collects all elements of a
window in state and is applied when the window is closed [1].

For that, I would evaluate the conditions in a MapFunction before the
window and emit for each condition a 1 if it is true and 0 if it is false.
This results in a vector of 1s and 0s (0, 1, 0, 0, 1, 0, 0)
The ReduceFunction would simply sum the 1s and 0s to compute the count.

You might want to have a look at the Table API [2] which would make this
even easier.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#reducefunction
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/tableApi.html#group-windows

2017-07-24 2:47 GMT+02:00 Raj Kumar <sm...@gmail.com>:

> Hi,
>
> we have a requirement where we need to aggregate the data every 10mins and
> write ONCE the aggregated results to the elastic search. Right now, we are
> iterating over the iterable to make a count of different status codes to do
> this. Is there a better way to count different status codes.
>
> public void apply(TimeWindow timeWindow, Iterable<Tuple4&lt;String, Long,
> String, String>> iterable, Collector<Tuple4&lt;String, Long, String,
> String>> collector) throws Exception {
>
>             long[] counts=new long[10];
>             Arrays.fill(counts,0l);
>
>             //count different type of records in a window
>             for (Tuple4<String, Long, String, String> in : iterable) {
>                 counts[0]++;
>                 if (in.f2!=null && in.f2.startsWith("5"))
>                     counts[1]++;
>                 else if (in.f2!=null && in.f2.startsWith("4"))
>                     counts[2]++;
>                 else if (in.f2!=null && in.f2.startsWith("2"))
>                     counts[3]++;
>
>                 if(in.f3!=null && in.f3.equalsIgnoreCase("GET"))
>                     counts[4]++;
>                 else if(in.f3!=null && in.f3.equalsIgnoreCase("POST"))
>                     counts[5]++;
>                 else if(in.f3!=null && in.f3.equalsIgnoreCase("PUT"))
>                     counts[6]++;
>                 else if(in.f3!=null && in.f3.equalsIgnoreCase("HEAD"))
>                     counts[7]++;
>
>             }
> ...
> }
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Count-Different-
> Codes-in-a-Window-tp14391.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Count Different Codes in a Window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Raj,

You can use ReduceFunction in combination with a WindowFunction [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#windowfunction-with-incremental-aggregation

2017-07-24 20:31 GMT+02:00 Raj Kumar <sm...@gmail.com>:

> Thanks Fabian. That helped.
>
> But I want to access the window start time. AFAIK, reduce can not give this
> details as it doesn't have timewindow object passed to the reduce method.
> How can I achieve this ?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Count-Different-
> Codes-in-a-Window-tp14391p14414.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Count Different Codes in a Window

Posted by Raj Kumar <sm...@gmail.com>.
Thanks Fabian. That helped.

But I want to access the window start time. AFAIK, reduce can not give this
details as it doesn't have timewindow object passed to the reduce method.
How can I achieve this ?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Count-Different-Codes-in-a-Window-tp14391p14414.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.