You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Rosenfeld, Viktor" <vi...@tu-berlin.de> on 2014/10/31 18:50:09 UTC

Hi / Aggregation support

Hi everybody,

First, I want to introduce myself to the community. I am a PhD student who wants to work with and improve Flink.

Second, I thought to work on improving aggregations as a start. My first goal is to simplify the computaton of a field average. Basically, I want to turn this plan:

    val input = env.fromCollection( Array(1L, 2L, 3L, 4L) )

    input
    .map { in => (in, 1L) }
    .sum(0).andSum(1)
    .map { in => in._1.toDouble / in._2.toDouble }
    .print

into this:

    // val input = ...
    input.average(0).print()

My basic idea is to internally still add the counter field and execute the map and sum steps but to hide them from the user.

Next, I want to support multiple aggregations so one can write something like:

    input.min(0).max(0).sum(0).count(0).average(0)

Internally, there should only be one pass over the input data and average should reuse the work done by sum and count.

In September there was some discussion [1] on the semantics of the min/max aggregations vs. minBy/maxBy. The consensus was that min/max should not simply return the respective field value but return the entire tuple. However, for count/sum/average there is no specific tuple and it would also not work for combinations of min/max.

One possible route is to simply return a random element, similar to MySQL. I think this can be very surprising to the user especially when min/max are combined.

Another possibility is to return the tuple only for single invocations of min or max and return the field value for the other aggregation functions or combinations. This is also inconstent but appears to be more inline with people's expectation. Also, there might be two or more tuples with the same min/max value and then the question is which should be returned.

I haven't yet thought about aggregations in a streaming context and I would appreciate any input on this.

Best,
Viktor

[1] http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Aggregations-td1706.html


Re: Hi / Aggregation support

Posted by Fabian Hueske <fh...@apache.org>.
Hi Viktor,

thanks for the update!

Regarding the explicit vs implicit adding of key fields:
I would only allow to use either key(x) or allKeys() and throw an exception
if they are mixed.
I guess there won't be many situations, where somebody would want to mix
them anyway.
No need to complicate the logic and semantics for this corner case, IMO.

Not sharing the result of two identical operations is fine. Whoever
computes
ds.aggregate(min(0), min(0))
deserves the overhead ;-)

We have the goal to keep the Java and Scala APIs in sync at any point in
time.
You can make a pull request only with the Java changes, but it won't be
merged until you (or somebody else) adapted the Scala API.
I would say, do the PR and start a discussion about that.
This way, everybody can review the code more easily.

I'll have a detailed look at the changes later.

Cheers, Fabian

2014-11-27 11:20 GMT+01:00 Viktor Rosenfeld <vi...@tu-berlin.de>:

> Hi Fabian,
>
> thanks for your feedback. See my responses below.
>
>
> Fabian Hueske wrote
> > - I would split the branch into two branches, one for each approach. That
> > make comparisons with master much easier.
>
> I've moved the changes necessary for the second approach to a branch called
> aggregation-alt:
> https://github.com/he-sk/incubator-flink/tree/aggregation-alt
>
>
> Fabian Hueske wrote
> > - I am not sure about the implicit adding of key fields if they are not
> > explicitly added by the user in the aggregation. It might confuse users
> if
> > the return type looks different from what they have specified. How about
> > having an allKeys() function that adds all keys of the grouping and not
> > adding keys by default?
>
> Done. But I'm not sure about it.
>
> It is not very clear where in the result the key fields should be added.
> The
> old code added them at the beginning. I'm now inserting them at the
> position
> where the allKeys() function is called except for those keys that are
> explicitly specified elsewhere. All in all, I think that the semantics are
> very
> opaque.
>
>
> Fabian Hueske wrote
> > - DataSet and UnorderedGrouping expose getter and setter methods for the
> > AggregationOperatorFactory. These methods are public and therefore facing
> > the user API. Can you make them private or even remove them. They are not
> > really necessary, right?
>
> I need the setter to test the delegation in DataSet.aggregate(). The test
> is
> fairly trivial but now that it's there, why remove it? I've made the
> getters
> and setters package private.
>
>
> Fabian Hueske wrote
> > - The aggregation GroupReduceFunction should be combinable to make it
> > perform better, esp. in case of aggregations on ungrouped datasets. It
> > would be even better, if you could convert the GroupReduceFunction into a
> > functional-style ReduceFunction. These function are always combinable and
> > can be executed using a hash-aggregation strategy once we have that
> > feature
> > implemented (again better performance). However, for that you would need
> > to
> > have a pre- and postprocessing MapFunctions (initialize and finalize of
> > aggregates). On the other hand, you only need three aggregation functions
> > sum, min, and max (count is sum of ones, avg is sum/count). This design
> > also eases the sharing of count for multiple avg aggregations.
>
> The GroupReduce cannot be made combinable because it changes the output
> tuple
> type. CombineFunction.combine() requires that both the input and the output
> type are the same.
>
> I changed the implementation to use 2 MapFunctions and a ReduceFunction.
>
> Also, I implemented average so that it picks up an existing count and sum.
> However, if the same function is specified multiple times (e.g., 2 calls to
> min(0) in one aggregate) it won't be reused. The reason is that every
> function
> stores only one position of the result in the output tuple. (But two
> average(0)
> functions will use the same count and sum functions because the result of
> count
> and sum is not represented in the output tuple.)
>
>
> Fabian Hueske wrote
> > - Some integration test cases would also be nice. See for example the
> > tests
> > in org.apache.flink.test.javaApiOperators.*
>
> I've copied the tests in AggregateITCase and SumMinMaxITCase for that.
>
>
> Fabian Hueske wrote
> > - We do not use @author tags in our code.
>
> Removed.
>
>
> Fabian Hueske wrote
> > - Finally, we try to keep the documentation in sync with the code. Once
> > your changes are ready for a PR, you should adapt the documenation in
> > ./docs according to your changes (no need to do it at this point).
> >
> > Please let me know if you have any questions.
>
> Do you think that for a pull request the implementation of the Scala API is
> necessary? Or should I create a pull request from the current code?
>
> Cheers,
> Viktor
>
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
> archive at Nabble.com.
>

Re: Hi / Aggregation support

Posted by Aljoscha Krettek <al...@apache.org>.
Ahh, I didn't see that. My bad.

On Thu, Nov 27, 2014 at 11:47 AM, Fabian Hueske <fh...@apache.org> wrote:
> Viktor said he changed the implementation to
> MapFunction -> ReduceFunction -> MapFunction.
>
> So it is combinable :-)
>
> 2014-11-27 11:45 GMT+01:00 Aljoscha Krettek <al...@apache.org>:
>
>> Hi,
>> why does the GroupReduce change the output type? Can this not be done
>> in the two mappers? In my opinion, aggregations should be combinable,
>> otherwise, performance would be severely crippled.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, Nov 27, 2014 at 11:20 AM, Viktor Rosenfeld
>> <vi...@tu-berlin.de> wrote:
>> > Hi Fabian,
>> >
>> > thanks for your feedback. See my responses below.
>> >
>> >
>> > Fabian Hueske wrote
>> >> - I would split the branch into two branches, one for each approach.
>> That
>> >> make comparisons with master much easier.
>> >
>> > I've moved the changes necessary for the second approach to a branch
>> called
>> > aggregation-alt:
>> > https://github.com/he-sk/incubator-flink/tree/aggregation-alt
>> >
>> >
>> > Fabian Hueske wrote
>> >> - I am not sure about the implicit adding of key fields if they are not
>> >> explicitly added by the user in the aggregation. It might confuse users
>> if
>> >> the return type looks different from what they have specified. How about
>> >> having an allKeys() function that adds all keys of the grouping and not
>> >> adding keys by default?
>> >
>> > Done. But I'm not sure about it.
>> >
>> > It is not very clear where in the result the key fields should be added.
>> The
>> > old code added them at the beginning. I'm now inserting them at the
>> position
>> > where the allKeys() function is called except for those keys that are
>> > explicitly specified elsewhere. All in all, I think that the semantics
>> are
>> > very
>> > opaque.
>> >
>> >
>> > Fabian Hueske wrote
>> >> - DataSet and UnorderedGrouping expose getter and setter methods for the
>> >> AggregationOperatorFactory. These methods are public and therefore
>> facing
>> >> the user API. Can you make them private or even remove them. They are
>> not
>> >> really necessary, right?
>> >
>> > I need the setter to test the delegation in DataSet.aggregate(). The
>> test is
>> > fairly trivial but now that it's there, why remove it? I've made the
>> getters
>> > and setters package private.
>> >
>> >
>> > Fabian Hueske wrote
>> >> - The aggregation GroupReduceFunction should be combinable to make it
>> >> perform better, esp. in case of aggregations on ungrouped datasets. It
>> >> would be even better, if you could convert the GroupReduceFunction into
>> a
>> >> functional-style ReduceFunction. These function are always combinable
>> and
>> >> can be executed using a hash-aggregation strategy once we have that
>> >> feature
>> >> implemented (again better performance). However, for that you would need
>> >> to
>> >> have a pre- and postprocessing MapFunctions (initialize and finalize of
>> >> aggregates). On the other hand, you only need three aggregation
>> functions
>> >> sum, min, and max (count is sum of ones, avg is sum/count). This design
>> >> also eases the sharing of count for multiple avg aggregations.
>> >
>> > The GroupReduce cannot be made combinable because it changes the output
>> > tuple
>> > type. CombineFunction.combine() requires that both the input and the
>> output
>> > type are the same.
>> >
>> > I changed the implementation to use 2 MapFunctions and a ReduceFunction.
>> >
>> > Also, I implemented average so that it picks up an existing count and
>> sum.
>> > However, if the same function is specified multiple times (e.g., 2 calls
>> to
>> > min(0) in one aggregate) it won't be reused. The reason is that every
>> > function
>> > stores only one position of the result in the output tuple. (But two
>> > average(0)
>> > functions will use the same count and sum functions because the result of
>> > count
>> > and sum is not represented in the output tuple.)
>> >
>> >
>> > Fabian Hueske wrote
>> >> - Some integration test cases would also be nice. See for example the
>> >> tests
>> >> in org.apache.flink.test.javaApiOperators.*
>> >
>> > I've copied the tests in AggregateITCase and SumMinMaxITCase for that.
>> >
>> >
>> > Fabian Hueske wrote
>> >> - We do not use @author tags in our code.
>> >
>> > Removed.
>> >
>> >
>> > Fabian Hueske wrote
>> >> - Finally, we try to keep the documentation in sync with the code. Once
>> >> your changes are ready for a PR, you should adapt the documenation in
>> >> ./docs according to your changes (no need to do it at this point).
>> >>
>> >> Please let me know if you have any questions.
>> >
>> > Do you think that for a pull request the implementation of the Scala API
>> is
>> > necessary? Or should I create a pull request from the current code?
>> >
>> > Cheers,
>> > Viktor
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html
>> > Sent from the Apache Flink (Incubator) Mailing List archive. mailing
>> list archive at Nabble.com.
>>

Re: Hi / Aggregation support

Posted by Fabian Hueske <fh...@apache.org>.
Viktor said he changed the implementation to
MapFunction -> ReduceFunction -> MapFunction.

So it is combinable :-)

2014-11-27 11:45 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> Hi,
> why does the GroupReduce change the output type? Can this not be done
> in the two mappers? In my opinion, aggregations should be combinable,
> otherwise, performance would be severely crippled.
>
> Cheers,
> Aljoscha
>
> On Thu, Nov 27, 2014 at 11:20 AM, Viktor Rosenfeld
> <vi...@tu-berlin.de> wrote:
> > Hi Fabian,
> >
> > thanks for your feedback. See my responses below.
> >
> >
> > Fabian Hueske wrote
> >> - I would split the branch into two branches, one for each approach.
> That
> >> make comparisons with master much easier.
> >
> > I've moved the changes necessary for the second approach to a branch
> called
> > aggregation-alt:
> > https://github.com/he-sk/incubator-flink/tree/aggregation-alt
> >
> >
> > Fabian Hueske wrote
> >> - I am not sure about the implicit adding of key fields if they are not
> >> explicitly added by the user in the aggregation. It might confuse users
> if
> >> the return type looks different from what they have specified. How about
> >> having an allKeys() function that adds all keys of the grouping and not
> >> adding keys by default?
> >
> > Done. But I'm not sure about it.
> >
> > It is not very clear where in the result the key fields should be added.
> The
> > old code added them at the beginning. I'm now inserting them at the
> position
> > where the allKeys() function is called except for those keys that are
> > explicitly specified elsewhere. All in all, I think that the semantics
> are
> > very
> > opaque.
> >
> >
> > Fabian Hueske wrote
> >> - DataSet and UnorderedGrouping expose getter and setter methods for the
> >> AggregationOperatorFactory. These methods are public and therefore
> facing
> >> the user API. Can you make them private or even remove them. They are
> not
> >> really necessary, right?
> >
> > I need the setter to test the delegation in DataSet.aggregate(). The
> test is
> > fairly trivial but now that it's there, why remove it? I've made the
> getters
> > and setters package private.
> >
> >
> > Fabian Hueske wrote
> >> - The aggregation GroupReduceFunction should be combinable to make it
> >> perform better, esp. in case of aggregations on ungrouped datasets. It
> >> would be even better, if you could convert the GroupReduceFunction into
> a
> >> functional-style ReduceFunction. These function are always combinable
> and
> >> can be executed using a hash-aggregation strategy once we have that
> >> feature
> >> implemented (again better performance). However, for that you would need
> >> to
> >> have a pre- and postprocessing MapFunctions (initialize and finalize of
> >> aggregates). On the other hand, you only need three aggregation
> functions
> >> sum, min, and max (count is sum of ones, avg is sum/count). This design
> >> also eases the sharing of count for multiple avg aggregations.
> >
> > The GroupReduce cannot be made combinable because it changes the output
> > tuple
> > type. CombineFunction.combine() requires that both the input and the
> output
> > type are the same.
> >
> > I changed the implementation to use 2 MapFunctions and a ReduceFunction.
> >
> > Also, I implemented average so that it picks up an existing count and
> sum.
> > However, if the same function is specified multiple times (e.g., 2 calls
> to
> > min(0) in one aggregate) it won't be reused. The reason is that every
> > function
> > stores only one position of the result in the output tuple. (But two
> > average(0)
> > functions will use the same count and sum functions because the result of
> > count
> > and sum is not represented in the output tuple.)
> >
> >
> > Fabian Hueske wrote
> >> - Some integration test cases would also be nice. See for example the
> >> tests
> >> in org.apache.flink.test.javaApiOperators.*
> >
> > I've copied the tests in AggregateITCase and SumMinMaxITCase for that.
> >
> >
> > Fabian Hueske wrote
> >> - We do not use @author tags in our code.
> >
> > Removed.
> >
> >
> > Fabian Hueske wrote
> >> - Finally, we try to keep the documentation in sync with the code. Once
> >> your changes are ready for a PR, you should adapt the documenation in
> >> ./docs according to your changes (no need to do it at this point).
> >>
> >> Please let me know if you have any questions.
> >
> > Do you think that for a pull request the implementation of the Scala API
> is
> > necessary? Or should I create a pull request from the current code?
> >
> > Cheers,
> > Viktor
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html
> > Sent from the Apache Flink (Incubator) Mailing List archive. mailing
> list archive at Nabble.com.
>

Re: Hi / Aggregation support

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
why does the GroupReduce change the output type? Can this not be done
in the two mappers? In my opinion, aggregations should be combinable,
otherwise, performance would be severely crippled.

Cheers,
Aljoscha

On Thu, Nov 27, 2014 at 11:20 AM, Viktor Rosenfeld
<vi...@tu-berlin.de> wrote:
> Hi Fabian,
>
> thanks for your feedback. See my responses below.
>
>
> Fabian Hueske wrote
>> - I would split the branch into two branches, one for each approach. That
>> make comparisons with master much easier.
>
> I've moved the changes necessary for the second approach to a branch called
> aggregation-alt:
> https://github.com/he-sk/incubator-flink/tree/aggregation-alt
>
>
> Fabian Hueske wrote
>> - I am not sure about the implicit adding of key fields if they are not
>> explicitly added by the user in the aggregation. It might confuse users if
>> the return type looks different from what they have specified. How about
>> having an allKeys() function that adds all keys of the grouping and not
>> adding keys by default?
>
> Done. But I'm not sure about it.
>
> It is not very clear where in the result the key fields should be added. The
> old code added them at the beginning. I'm now inserting them at the position
> where the allKeys() function is called except for those keys that are
> explicitly specified elsewhere. All in all, I think that the semantics are
> very
> opaque.
>
>
> Fabian Hueske wrote
>> - DataSet and UnorderedGrouping expose getter and setter methods for the
>> AggregationOperatorFactory. These methods are public and therefore facing
>> the user API. Can you make them private or even remove them. They are not
>> really necessary, right?
>
> I need the setter to test the delegation in DataSet.aggregate(). The test is
> fairly trivial but now that it's there, why remove it? I've made the getters
> and setters package private.
>
>
> Fabian Hueske wrote
>> - The aggregation GroupReduceFunction should be combinable to make it
>> perform better, esp. in case of aggregations on ungrouped datasets. It
>> would be even better, if you could convert the GroupReduceFunction into a
>> functional-style ReduceFunction. These function are always combinable and
>> can be executed using a hash-aggregation strategy once we have that
>> feature
>> implemented (again better performance). However, for that you would need
>> to
>> have a pre- and postprocessing MapFunctions (initialize and finalize of
>> aggregates). On the other hand, you only need three aggregation functions
>> sum, min, and max (count is sum of ones, avg is sum/count). This design
>> also eases the sharing of count for multiple avg aggregations.
>
> The GroupReduce cannot be made combinable because it changes the output
> tuple
> type. CombineFunction.combine() requires that both the input and the output
> type are the same.
>
> I changed the implementation to use 2 MapFunctions and a ReduceFunction.
>
> Also, I implemented average so that it picks up an existing count and sum.
> However, if the same function is specified multiple times (e.g., 2 calls to
> min(0) in one aggregate) it won't be reused. The reason is that every
> function
> stores only one position of the result in the output tuple. (But two
> average(0)
> functions will use the same count and sum functions because the result of
> count
> and sum is not represented in the output tuple.)
>
>
> Fabian Hueske wrote
>> - Some integration test cases would also be nice. See for example the
>> tests
>> in org.apache.flink.test.javaApiOperators.*
>
> I've copied the tests in AggregateITCase and SumMinMaxITCase for that.
>
>
> Fabian Hueske wrote
>> - We do not use @author tags in our code.
>
> Removed.
>
>
> Fabian Hueske wrote
>> - Finally, we try to keep the documentation in sync with the code. Once
>> your changes are ready for a PR, you should adapt the documenation in
>> ./docs according to your changes (no need to do it at this point).
>>
>> Please let me know if you have any questions.
>
> Do you think that for a pull request the implementation of the Scala API is
> necessary? Or should I create a pull request from the current code?
>
> Cheers,
> Viktor
>
>
>
>
> --
> View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.

Re: Hi / Aggregation support

Posted by Viktor Rosenfeld <vi...@tu-berlin.de>.
Hi Fabian,

thanks for your feedback. See my responses below.


Fabian Hueske wrote
> - I would split the branch into two branches, one for each approach. That
> make comparisons with master much easier.

I've moved the changes necessary for the second approach to a branch called
aggregation-alt:
https://github.com/he-sk/incubator-flink/tree/aggregation-alt


Fabian Hueske wrote
> - I am not sure about the implicit adding of key fields if they are not
> explicitly added by the user in the aggregation. It might confuse users if
> the return type looks different from what they have specified. How about
> having an allKeys() function that adds all keys of the grouping and not
> adding keys by default?

Done. But I'm not sure about it. 

It is not very clear where in the result the key fields should be added. The
old code added them at the beginning. I'm now inserting them at the position
where the allKeys() function is called except for those keys that are
explicitly specified elsewhere. All in all, I think that the semantics are
very
opaque. 


Fabian Hueske wrote
> - DataSet and UnorderedGrouping expose getter and setter methods for the
> AggregationOperatorFactory. These methods are public and therefore facing
> the user API. Can you make them private or even remove them. They are not
> really necessary, right?

I need the setter to test the delegation in DataSet.aggregate(). The test is
fairly trivial but now that it's there, why remove it? I've made the getters
and setters package private.


Fabian Hueske wrote
> - The aggregation GroupReduceFunction should be combinable to make it
> perform better, esp. in case of aggregations on ungrouped datasets. It
> would be even better, if you could convert the GroupReduceFunction into a
> functional-style ReduceFunction. These function are always combinable and
> can be executed using a hash-aggregation strategy once we have that
> feature
> implemented (again better performance). However, for that you would need
> to
> have a pre- and postprocessing MapFunctions (initialize and finalize of
> aggregates). On the other hand, you only need three aggregation functions
> sum, min, and max (count is sum of ones, avg is sum/count). This design
> also eases the sharing of count for multiple avg aggregations.

The GroupReduce cannot be made combinable because it changes the output
tuple
type. CombineFunction.combine() requires that both the input and the output
type are the same.

I changed the implementation to use 2 MapFunctions and a ReduceFunction.

Also, I implemented average so that it picks up an existing count and sum.
However, if the same function is specified multiple times (e.g., 2 calls to
min(0) in one aggregate) it won't be reused. The reason is that every
function
stores only one position of the result in the output tuple. (But two
average(0)
functions will use the same count and sum functions because the result of
count
and sum is not represented in the output tuple.)


Fabian Hueske wrote
> - Some integration test cases would also be nice. See for example the
> tests
> in org.apache.flink.test.javaApiOperators.*

I've copied the tests in AggregateITCase and SumMinMaxITCase for that.


Fabian Hueske wrote
> - We do not use @author tags in our code.

Removed.


Fabian Hueske wrote
> - Finally, we try to keep the documentation in sync with the code. Once
> your changes are ready for a PR, you should adapt the documenation in
> ./docs according to your changes (no need to do it at this point).
> 
> Please let me know if you have any questions.

Do you think that for a pull request the implementation of the Scala API is
necessary? Or should I create a pull request from the current code?

Cheers,
Viktor




--
View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2643.html
Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.

Re: Hi / Aggregation support

Posted by Fabian Hueske <fh...@apache.org>.
Hi Viktor,

I had a look at your branch.
First of all, it looks like very good work! Good code quality, lots of
tests, well documented, nice!
I like the first approach (ds.aggregate(min(1), max(2), count()) much
better than the other one. It basically shows how the result tuple is
constructed.

I also have a few comments on the code and the overall approach:
- I would split the branch into two branches, one for each approach. That
make comparisons with master much easier.
- I am not sure about the implicit adding of key fields if they are not
explicitly added by the user in the aggregation. It might confuse users if
the return type looks different from what they have specified. How about
having an allKeys() function that adds all keys of the grouping and not
adding keys by default?
- DataSet and UnorderedGrouping expose getter and setter methods for the
AggregationOperatorFactory. These methods are public and therefore facing
the user API. Can you make them private or even remove them. They are not
really necessary, right?
- The aggregation GroupReduceFunction should be combinable to make it
perform better, esp. in case of aggregations on ungrouped datasets. It
would be even better, if you could convert the GroupReduceFunction into a
functional-style ReduceFunction. These function are always combinable and
can be executed using a hash-aggregation strategy once we have that feature
implemented (again better performance). However, for that you would need to
have a pre- and postprocessing MapFunctions (initialize and finalize of
aggregates). On the other hand, you only need three aggregation functions
sum, min, and max (count is sum of ones, avg is sum/count). This design
also eases the sharing of count for multiple avg aggregations.
- Some integration test cases would also be nice. See for example the tests
in org.apache.flink.test.javaApiOperators.*
- We do not use @author tags in our code.
- Finally, we try to keep the documentation in sync with the code. Once
your changes are ready for a PR, you should adapt the documenation in
./docs according to your changes (no need to do it at this point).

Please let me know if you have any questions.

Cheers, Fabian




2014-11-19 16:41 GMT+01:00 Viktor Rosenfeld <vi...@tu-berlin.de>:

> Hi everybody,
>
> I've created a GitHub branch for the new aggregation code:
> https://github.com/he-sk/incubator-flink/tree/aggregation
>
> I have implemented both of the APIs that I proposed earlier, so people can
> play around and decide which they like better:
>
>   DataSet ds = ...
>   ds.groupBy(0).aggregate(min(1), max(1), count())
>
> And:
>
>   DataSet ds = ...
>   ds.groupBy(0).min(1).max(1).count().aggregate()
>
> The second version is a thin layer on the first version.
>
> The aggregation functions min, max, sum, count, and average are supported.
> For groupings, you can select the group keys with (multiple) key()
> pseudo-aggregation functions. By default, all group keys are used.
>
> You can find examples in AggregationApi1Test.java and
> AggregationApi2Test.java.
>
> Right now, only the Java API uses the new aggregation code. I've only
> started learning Scala so I don't know how easy it will be to port the new
> API. One problem that I foresee is that the type information of the input
> tuples is lost. Therefore, the Scala compiler cannot do type inference on
> the output tuple. I hope that this can be fixed or worked around by simple
> specifying the output tuple type directly.
>
> I've kept the old aggregation API but marked it deprecated and renamed some
> functions.
>
> The next steps would be:
>
> 1) Implement Scala API.
> 2) Add support for POJOs (sync with streaming aggregations for that).
>
> Looking forward to your input.
>
> Best,
> Viktor
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2547.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
> archive at Nabble.com.
>

Re: Hi / Aggregation support

Posted by Viktor Rosenfeld <vi...@tu-berlin.de>.
Hi everybody,

I've created a GitHub branch for the new aggregation code:
https://github.com/he-sk/incubator-flink/tree/aggregation

I have implemented both of the APIs that I proposed earlier, so people can
play around and decide which they like better:

  DataSet ds = ...
  ds.groupBy(0).aggregate(min(1), max(1), count())

And: 

  DataSet ds = ...
  ds.groupBy(0).min(1).max(1).count().aggregate()

The second version is a thin layer on the first version.

The aggregation functions min, max, sum, count, and average are supported.
For groupings, you can select the group keys with (multiple) key()
pseudo-aggregation functions. By default, all group keys are used.

You can find examples in AggregationApi1Test.java and
AggregationApi2Test.java.

Right now, only the Java API uses the new aggregation code. I've only
started learning Scala so I don't know how easy it will be to port the new
API. One problem that I foresee is that the type information of the input
tuples is lost. Therefore, the Scala compiler cannot do type inference on
the output tuple. I hope that this can be fixed or worked around by simple
specifying the output tuple type directly.

I've kept the old aggregation API but marked it deprecated and renamed some
functions.

The next steps would be:

1) Implement Scala API.
2) Add support for POJOs (sync with streaming aggregations for that).

Looking forward to your input.

Best,
Viktor



--
View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2547.html
Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.

Re: Hi / Aggregation support

Posted by Viktor Rosenfeld <vi...@tu-berlin.de>.
Yes, you would need static import methods.

Best,
Viktor


Stephan Ewen wrote
> I guess you would need static method imports to make the code look like
> this, which I think is fine.
> 
> On Mon, Nov 10, 2014 at 12:00 PM, Fabian Hueske &lt;

> fhueske@

> &gt; wrote:
> 
>> How/where do you plan to define the methods min(1), max(1), and cnt()?
>> If these are static methods in some kind of Aggregation class, it won't
>> look so concise anymore, or am I missing something here?
>>
>> I would be fine with both ways, the second one being nice, if it can be
>> done like that.





--
View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2438.html
Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.

Re: Hi / Aggregation support

Posted by Stephan Ewen <se...@apache.org>.
I guess you would need static method imports to make the code look like
this, which I think is fine.

On Mon, Nov 10, 2014 at 12:00 PM, Fabian Hueske <fh...@apache.org> wrote:

> How/where do you plan to define the methods min(1), max(1), and cnt()?
> If these are static methods in some kind of Aggregation class, it won't
> look so concise anymore, or am I missing something here?
>
> I would be fine with both ways, the second one being nice, if it can be
> done like that.
>
> 2014-11-10 11:03 GMT+01:00 Gyula Fora <gy...@apache.org>:
>
> > I also support this approach:
> >
> >  ds.groupBy(0).aggregate(min(1), max(1), cnt())
> >
> > I think it makes the code more readable, because it is easy to see whats
> > in the result tuple.
> >
> > Gyula
> >
> > > On 10 Nov 2014, at 10:49, Aljoscha Krettek <al...@apache.org>
> wrote:
> > >
> > > I like this version: ds.groupBy(0).aggregate(min(1), max(1), cnt()),
> > > very concise.
> > >
> > > On Mon, Nov 10, 2014 at 10:42 AM, Viktor Rosenfeld
> > > <vi...@tu-berlin.de> wrote:
> > >> Hi Fabian,
> > >>
> > >> I ran into a problem with your syntax example:
> > >>
> > >> DataSet<Tuple2&lt;String, Integer>> ds = ...
> > >> DataSet<Tuple4&lt;Tuple2&lt;String,Integer>,Integer, Integer, Long>
> > result =
> > >> ds.groupBy(0).min(1).andMax(1).andCnt();
> > >>
> > >> Basically, in the example above we don't know how long the chain of
> > >> aggregation method calls is. Also, each aggregation method call adds a
> > >> field to the result tuple (the first call to groupBy returns a
> > >> Tuple1). Because the resultType of an operator is specified in the
> > >> constructur, every one of those method calls needs to create a new
> > >> Operator<OUT> with the correct result type. However, only the
> > >> translateToDataflow method of the last method call in the chain should
> > >> actually compute the aggregation.
> > >>
> > >> This can be achieved by testing if an aggregation method is called on
> > >> an AggregationOperator. The translateToDataFlow method of the
> > >> operators in the start/middle of the chain would then just return a
> > >> MapOperatorBase which simply extends the tuple. The
> > >> translateToDataFlow method of the last operator in the chain would
> > >> return a GroupReduceOperatorBase.
> > >>
> > >> This strategy seems very hackish and involves lots of unnecessary
> > >> copying of tuple data. I think a better way would be to use the
> > >> following syntax:
> > >>
> > >> ds.groupBy(0).aggregate(min(1), max(1), cnt())
> > >>
> > >> or
> > >>
> > >> ds.groupBy(0).min(1).max(1).cnt(1).aggregate()
> > >>
> > >> Here, there is only one method which creates a new operator, the
> > >> aggregate method, and the final resultType is known when aggregate is
> > >> called.
> > >>
> > >> What do you think?
> > >>
> > >> Best,
> > >> Viktor
> > >>
> > >>
> > >>
> > >> --
> > >> View this message in context:
> >
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html
> > >> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
> > list archive at Nabble.com.
> >
> >
>

Re: Hi / Aggregation support

Posted by Fabian Hueske <fh...@apache.org>.
How/where do you plan to define the methods min(1), max(1), and cnt()?
If these are static methods in some kind of Aggregation class, it won't
look so concise anymore, or am I missing something here?

I would be fine with both ways, the second one being nice, if it can be
done like that.

2014-11-10 11:03 GMT+01:00 Gyula Fora <gy...@apache.org>:

> I also support this approach:
>
>  ds.groupBy(0).aggregate(min(1), max(1), cnt())
>
> I think it makes the code more readable, because it is easy to see whats
> in the result tuple.
>
> Gyula
>
> > On 10 Nov 2014, at 10:49, Aljoscha Krettek <al...@apache.org> wrote:
> >
> > I like this version: ds.groupBy(0).aggregate(min(1), max(1), cnt()),
> > very concise.
> >
> > On Mon, Nov 10, 2014 at 10:42 AM, Viktor Rosenfeld
> > <vi...@tu-berlin.de> wrote:
> >> Hi Fabian,
> >>
> >> I ran into a problem with your syntax example:
> >>
> >> DataSet<Tuple2&lt;String, Integer>> ds = ...
> >> DataSet<Tuple4&lt;Tuple2&lt;String,Integer>,Integer, Integer, Long>
> result =
> >> ds.groupBy(0).min(1).andMax(1).andCnt();
> >>
> >> Basically, in the example above we don't know how long the chain of
> >> aggregation method calls is. Also, each aggregation method call adds a
> >> field to the result tuple (the first call to groupBy returns a
> >> Tuple1). Because the resultType of an operator is specified in the
> >> constructur, every one of those method calls needs to create a new
> >> Operator<OUT> with the correct result type. However, only the
> >> translateToDataflow method of the last method call in the chain should
> >> actually compute the aggregation.
> >>
> >> This can be achieved by testing if an aggregation method is called on
> >> an AggregationOperator. The translateToDataFlow method of the
> >> operators in the start/middle of the chain would then just return a
> >> MapOperatorBase which simply extends the tuple. The
> >> translateToDataFlow method of the last operator in the chain would
> >> return a GroupReduceOperatorBase.
> >>
> >> This strategy seems very hackish and involves lots of unnecessary
> >> copying of tuple data. I think a better way would be to use the
> >> following syntax:
> >>
> >> ds.groupBy(0).aggregate(min(1), max(1), cnt())
> >>
> >> or
> >>
> >> ds.groupBy(0).min(1).max(1).cnt(1).aggregate()
> >>
> >> Here, there is only one method which creates a new operator, the
> >> aggregate method, and the final resultType is known when aggregate is
> >> called.
> >>
> >> What do you think?
> >>
> >> Best,
> >> Viktor
> >>
> >>
> >>
> >> --
> >> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html
> >> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
> list archive at Nabble.com.
>
>

Re: Hi / Aggregation support

Posted by Gyula Fora <gy...@apache.org>.
I also support this approach:

 ds.groupBy(0).aggregate(min(1), max(1), cnt()) 

I think it makes the code more readable, because it is easy to see whats in the result tuple.

Gyula

> On 10 Nov 2014, at 10:49, Aljoscha Krettek <al...@apache.org> wrote:
> 
> I like this version: ds.groupBy(0).aggregate(min(1), max(1), cnt()),
> very concise.
> 
> On Mon, Nov 10, 2014 at 10:42 AM, Viktor Rosenfeld
> <vi...@tu-berlin.de> wrote:
>> Hi Fabian,
>> 
>> I ran into a problem with your syntax example:
>> 
>> DataSet<Tuple2&lt;String, Integer>> ds = ...
>> DataSet<Tuple4&lt;Tuple2&lt;String,Integer>,Integer, Integer, Long> result =
>> ds.groupBy(0).min(1).andMax(1).andCnt();
>> 
>> Basically, in the example above we don't know how long the chain of
>> aggregation method calls is. Also, each aggregation method call adds a
>> field to the result tuple (the first call to groupBy returns a
>> Tuple1). Because the resultType of an operator is specified in the
>> constructur, every one of those method calls needs to create a new
>> Operator<OUT> with the correct result type. However, only the
>> translateToDataflow method of the last method call in the chain should
>> actually compute the aggregation.
>> 
>> This can be achieved by testing if an aggregation method is called on
>> an AggregationOperator. The translateToDataFlow method of the
>> operators in the start/middle of the chain would then just return a
>> MapOperatorBase which simply extends the tuple. The
>> translateToDataFlow method of the last operator in the chain would
>> return a GroupReduceOperatorBase.
>> 
>> This strategy seems very hackish and involves lots of unnecessary
>> copying of tuple data. I think a better way would be to use the
>> following syntax:
>> 
>> ds.groupBy(0).aggregate(min(1), max(1), cnt())
>> 
>> or
>> 
>> ds.groupBy(0).min(1).max(1).cnt(1).aggregate()
>> 
>> Here, there is only one method which creates a new operator, the
>> aggregate method, and the final resultType is known when aggregate is
>> called.
>> 
>> What do you think?
>> 
>> Best,
>> Viktor
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html
>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.


Re: Hi / Aggregation support

Posted by Aljoscha Krettek <al...@apache.org>.
I like this version: ds.groupBy(0).aggregate(min(1), max(1), cnt()),
very concise.

On Mon, Nov 10, 2014 at 10:42 AM, Viktor Rosenfeld
<vi...@tu-berlin.de> wrote:
> Hi Fabian,
>
> I ran into a problem with your syntax example:
>
> DataSet<Tuple2&lt;String, Integer>> ds = ...
> DataSet<Tuple4&lt;Tuple2&lt;String,Integer>,Integer, Integer, Long> result =
> ds.groupBy(0).min(1).andMax(1).andCnt();
>
> Basically, in the example above we don't know how long the chain of
> aggregation method calls is. Also, each aggregation method call adds a
> field to the result tuple (the first call to groupBy returns a
> Tuple1). Because the resultType of an operator is specified in the
> constructur, every one of those method calls needs to create a new
> Operator<OUT> with the correct result type. However, only the
> translateToDataflow method of the last method call in the chain should
> actually compute the aggregation.
>
> This can be achieved by testing if an aggregation method is called on
> an AggregationOperator. The translateToDataFlow method of the
> operators in the start/middle of the chain would then just return a
> MapOperatorBase which simply extends the tuple. The
> translateToDataFlow method of the last operator in the chain would
> return a GroupReduceOperatorBase.
>
> This strategy seems very hackish and involves lots of unnecessary
> copying of tuple data. I think a better way would be to use the
> following syntax:
>
> ds.groupBy(0).aggregate(min(1), max(1), cnt())
>
> or
>
> ds.groupBy(0).min(1).max(1).cnt(1).aggregate()
>
> Here, there is only one method which creates a new operator, the
> aggregate method, and the final resultType is known when aggregate is
> called.
>
> What do you think?
>
> Best,
> Viktor
>
>
>
> --
> View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.

Re: Hi / Aggregation support

Posted by Viktor Rosenfeld <vi...@tu-berlin.de>.
Hi Fabian,

I ran into a problem with your syntax example:

DataSet<Tuple2&lt;String, Integer>> ds = ... 
DataSet<Tuple4&lt;Tuple2&lt;String,Integer>,Integer, Integer, Long> result = 
ds.groupBy(0).min(1).andMax(1).andCnt(); 

Basically, in the example above we don't know how long the chain of
aggregation method calls is. Also, each aggregation method call adds a
field to the result tuple (the first call to groupBy returns a
Tuple1). Because the resultType of an operator is specified in the
constructur, every one of those method calls needs to create a new
Operator<OUT> with the correct result type. However, only the
translateToDataflow method of the last method call in the chain should
actually compute the aggregation.

This can be achieved by testing if an aggregation method is called on
an AggregationOperator. The translateToDataFlow method of the
operators in the start/middle of the chain would then just return a
MapOperatorBase which simply extends the tuple. The
translateToDataFlow method of the last operator in the chain would
return a GroupReduceOperatorBase.

This strategy seems very hackish and involves lots of unnecessary
copying of tuple data. I think a better way would be to use the
following syntax:

ds.groupBy(0).aggregate(min(1), max(1), cnt())

or

ds.groupBy(0).min(1).max(1).cnt(1).aggregate()

Here, there is only one method which creates a new operator, the
aggregate method, and the final resultType is known when aggregate is
called.

What do you think?

Best,
Viktor



--
View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2429.html
Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.

Re: Hi / Aggregation support

Posted by Fabian Hueske <fh...@apache.org>.
That sounds good to me. Although making the key copy implicit might confuse
users who need to take that into account when specifying the type of data
sets of operators...

Taking the key(s) from the Grouping should not be a problem.

2014-11-04 15:12 GMT+01:00 viktor.rosenfeld <vi...@tu-berlin.de>:

> Hi Fabian,
>
>
> Fabian Hueske wrote
> > DataSet&lt;Tuple2&lt;String, Integer&gt;> ds = ...
> > DataSet&lt;Tuple4&lt;String,Integer, Integer, Long&gt; result =
> > ds.groupBy(0).key(0).andMin(1).andMax(1).andCnt();
> >
> > The second example explicitly extracts the key
> > from the original input data.
>
> Wouldn't it make sense to use the call to groupBy() to also extract the key
> fields? So in your example, the call to key(0) is redundant. If there are
> multiple fields specified in groupBy() then all of them would be used as
> the
> key. If the user only wants a specific key, he can specify them by
> explicitly calling the key() method. Specifying a field in key() that is
> not
> used in groupBy() would be an error. This is close to (proper) SQL
> semantics.
>
> What do you think?
>
> I'm not a big fan of how MySQL let's you specify attributes that are not
> grouped or averaged and returns a random element for them. (I think that's
> a
> bug in MySQL, although there's probably a reason for the behavior.)
>
> Best,
> Viktor
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2359.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
> archive at Nabble.com.
>

Re: Hi / Aggregation support

Posted by "viktor.rosenfeld" <vi...@tu-berlin.de>.
Hi Fabian,


Fabian Hueske wrote
> DataSet&lt;Tuple2&lt;String, Integer&gt;> ds = ...
> DataSet&lt;Tuple4&lt;String,Integer, Integer, Long&gt; result =
> ds.groupBy(0).key(0).andMin(1).andMax(1).andCnt();
> 
> The second example explicitly extracts the key
> from the original input data.

Wouldn't it make sense to use the call to groupBy() to also extract the key
fields? So in your example, the call to key(0) is redundant. If there are
multiple fields specified in groupBy() then all of them would be used as the
key. If the user only wants a specific key, he can specify them by
explicitly calling the key() method. Specifying a field in key() that is not
used in groupBy() would be an error. This is close to (proper) SQL
semantics.

What do you think?

I'm not a big fan of how MySQL let's you specify attributes that are not
grouped or averaged and returns a random element for them. (I think that's a
bug in MySQL, although there's probably a reason for the behavior.)

Best,
Viktor



--
View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Hi-Aggregation-support-tp2311p2359.html
Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.

Re: Hi / Aggregation support

Posted by Fabian Hueske <fh...@apache.org>.
Hi Viktor,

welcome on the dev mailing list! :-)

I agree that Flink's aggregations should be improved in various aspects:
- support more aggregation functions. Currently only MIN, MAX, SUM are
supported. Adding COUNT and AVG would be nice!
- support for multiple aggregations per field
- support for aggregations on POJO DataSets

How about to always return Tuples as the result of an aggregation. For
example something like:

DataSet<Tuple2<String, Integer>> ds = ...
DataSet<Tuple4<Tuple2<String,Integer>,Integer, Integer, Long> result =
ds.groupBy(0).min(1).andMax(1).andCnt();

or

DataSet<Tuple2<String, Integer>> ds = ...
DataSet<Tuple4<String,Integer, Integer, Long> result =
ds.groupBy(0).key(0).andMin(1).andMax(1).andCnt();

In the first version, an arbitrary element of the group is added to the
result to identify the keys. The second example explicitly extracts the key
from the original input data. POJO data types can be handled similarly by
specifying the member fields to aggregate (or copy as key) by name.

Doing aggregations "in-place" within an input data type (and leaving other
fields untouched) could be a special variant of this operator.

2014-10-31 18:50 GMT+01:00 Rosenfeld, Viktor <vi...@tu-berlin.de>
:

> Hi everybody,
>
> First, I want to introduce myself to the community. I am a PhD student who
> wants to work with and improve Flink.
>
> Second, I thought to work on improving aggregations as a start. My first
> goal is to simplify the computaton of a field average. Basically, I want to
> turn this plan:
>
>     val input = env.fromCollection( Array(1L, 2L, 3L, 4L) )
>
>     input
>     .map { in => (in, 1L) }
>     .sum(0).andSum(1)
>     .map { in => in._1.toDouble / in._2.toDouble }
>     .print
>
> into this:
>
>     // val input = ...
>     input.average(0).print()
>
> My basic idea is to internally still add the counter field and execute the
> map and sum steps but to hide them from the user.
>
> Next, I want to support multiple aggregations so one can write something
> like:
>
>     input.min(0).max(0).sum(0).count(0).average(0)
>
> Internally, there should only be one pass over the input data and average
> should reuse the work done by sum and count.
>
> In September there was some discussion [1] on the semantics of the min/max
> aggregations vs. minBy/maxBy. The consensus was that min/max should not
> simply return the respective field value but return the entire tuple.
> However, for count/sum/average there is no specific tuple and it would also
> not work for combinations of min/max.
>
> One possible route is to simply return a random element, similar to MySQL.
> I think this can be very surprising to the user especially when min/max are
> combined.
>
> Another possibility is to return the tuple only for single invocations of
> min or max and return the field value for the other aggregation functions
> or combinations. This is also inconstent but appears to be more inline with
> people's expectation. Also, there might be two or more tuples with the same
> min/max value and then the question is which should be returned.
>
> I haven't yet thought about aggregations in a streaming context and I
> would appreciate any input on this.
>
> Best,
> Viktor
>
> [1]
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/Aggregations-td1706.html
>
>