You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GEOFBOT <gi...@git.apache.org> on 2016/06/16 13:16:58 UTC

[GitHub] flink pull request #2115: [FLINK-4017] [py] Add Aggregation support to Pytho...

GitHub user GEOFBOT opened a pull request:

    https://github.com/apache/flink/pull/2115

    [FLINK-4017] [py] Add Aggregation support to Python API

    Adds Aggregation support in the Python API accessible through `.aggregate()` and `.agg_and()`. (I was unable to use `.and()` in Python because 'and' is a keyword.)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/GEOFBOT/flink FLINK-4017

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2115.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2115
    
----
commit f3836f1e3919ca450baa3f633d9ece538008b106
Author: Geoffrey Mon <ge...@gmail.com>
Date:   2016-06-02T16:10:59Z

    [FLINK-4017] [py] Add Aggregation support to Python API
    
    Assembles and applies a GroupReduceFunction using pre-defined
    AggregationOperations.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2115: [FLINK-4017] [py] Add Aggregation support to Pytho...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2115


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2115: [FLINK-4017] [py] Add Aggregation support to Pytho...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2115#discussion_r67940565
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---
    @@ -192,6 +193,30 @@ def reduce(self, operator):
             self._env._sets.append(child)
             return child_set
     
    +    def aggregate(self, aggregation, field):
    +        """
    +        Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet.
    +        :param aggregation: The built-in aggregation function to apply on the DataSet.
    +        :param field: The index of the Tuple field on which to perform the function.
    +        :return: A GroupReduceOperator that represents the aggregated DataSet.
    +        """
    +        child_set = self.reduce_group(aggregation(field), combinable=True)
    --- End diff --
    
    there should also be a test case for a non-grouped aggregation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2115: [FLINK-4017] [py] Add Aggregation support to Pytho...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2115#discussion_r67940433
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---
    @@ -192,6 +193,30 @@ def reduce(self, operator):
             self._env._sets.append(child)
             return child_set
     
    +    def aggregate(self, aggregation, field):
    +        """
    +        Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet.
    +        :param aggregation: The built-in aggregation function to apply on the DataSet.
    +        :param field: The index of the Tuple field on which to perform the function.
    +        :return: A GroupReduceOperator that represents the aggregated DataSet.
    +        """
    +        child_set = self.reduce_group(aggregation(field), combinable=True)
    --- End diff --
    
    self.reduce_group(AggregationFunction(aggregation, field), combinable=True)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2115
  
    Looks good overall. I found 1 big issue, the rest are minor things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2115: [FLINK-4017] [py] Add Aggregation support to Pytho...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2115#discussion_r67940988
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---
    @@ -192,6 +193,30 @@ def reduce(self, operator):
             self._env._sets.append(child)
             return child_set
     
    +    def aggregate(self, aggregation, field):
    +        """
    +        Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet.
    +        :param aggregation: The built-in aggregation function to apply on the DataSet.
    +        :param field: The index of the Tuple field on which to perform the function.
    +        :return: A GroupReduceOperator that represents the aggregated DataSet.
    +        """
    +        child_set = self.reduce_group(aggregation(field), combinable=True)
    +        child_set._info.name = "PythonAggregate"
    +        return child_set
    +
    +    def agg_and(self, aggregation, field):
    +        """
    +        Applies an additional Aggregate transformation.
    +        :param aggregation: The built-in aggregation operation to apply on the DataSet.
    +        :param field: The index of the Tuple field on which to perform the function.
    +        :return: A GroupReduceOperator that represents the aggregated DataSet.
    +        """
    +        if self._info.name == "PythonAggregate":
    --- End diff --
    
    this is a bit icky. it would be better to create a separate AggregateOperator class that inherits from DataSet and contains the agg_and method. (similar to how cross/join works)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2115
  
    merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2115: [FLINK-4017] [py] Add Aggregation support to Pytho...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2115#discussion_r67950293
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---
    @@ -192,6 +193,30 @@ def reduce(self, operator):
             self._env._sets.append(child)
             return child_set
     
    +    def aggregate(self, aggregation, field):
    +        """
    +        Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet.
    +        :param aggregation: The built-in aggregation function to apply on the DataSet.
    +        :param field: The index of the Tuple field on which to perform the function.
    +        :return: A GroupReduceOperator that represents the aggregated DataSet.
    +        """
    +        child_set = self.reduce_group(aggregation(field), combinable=True)
    +        child_set._info.name = "PythonAggregate"
    +        return child_set
    +
    +    def agg_and(self, aggregation, field):
    --- End diff --
    
    i would name this method and_agg.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API

Posted by GEOFBOT <gi...@git.apache.org>.
Github user GEOFBOT commented on the issue:

    https://github.com/apache/flink/pull/2115
  
    I've addressed the documentation issue, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2115
  
    Thank you for your contribution, I will review this tomorrow :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2115
  
    I'll have to try it out to be sure, but i can't a problem looking through the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2115: [FLINK-4017] [py] Add Aggregation support to Pytho...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2115#discussion_r67941010
  
    --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---
    @@ -192,6 +193,30 @@ def reduce(self, operator):
             self._env._sets.append(child)
             return child_set
     
    +    def aggregate(self, aggregation, field):
    +        """
    +        Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet.
    +        :param aggregation: The built-in aggregation function to apply on the DataSet.
    +        :param field: The index of the Tuple field on which to perform the function.
    +        :return: A GroupReduceOperator that represents the aggregated DataSet.
    +        """
    +        child_set = self.reduce_group(aggregation(field), combinable=True)
    +        child_set._info.name = "PythonAggregate"
    --- End diff --
    
    Let's include the aggregation type in the name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2115
  
    Found a small issue in the documentation, otherwise +1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2115: [FLINK-4017] [py] Add Aggregation support to Python API

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2115
  
    Please write a comment when you update the PR, we don't get any notifications for pushed commits :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2115: [FLINK-4017] [py] Add Aggregation support to Pytho...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2115#discussion_r69535980
  
    --- Diff: docs/apis/batch/dataset_transformations.md ---
    @@ -1010,7 +1013,10 @@ val output = input.aggregate(SUM, 0).and(MIN, 2)
     <div data-lang="python" markdown="1">
     
     ~~~python
    -Not supported.
    +from flink.functions.Aggregation import Sum, Min
    +
    +input = # [...]
    +output = input.aggregate(Sum, 0).agg_and(Min, 2)
    --- End diff --
    
    documentation wasn't updated for the rename to `and_agg`. The same applies to the `python.md`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---