You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shaoxuan-wang <gi...@git.apache.org> on 2017/04/24 16:39:15 UTC

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

GitHub user shaoxuan-wang opened a pull request:

    https://github.com/apache/flink/pull/3762

    [FLINK-6361] [table] Refactoring the AggregateFunction interface and built-in aggregates

    This PR includes the following changes:
    1) remove Accumulator trait; 
    2) move accumulate, retract, merge, resetAccumulator, getAccumulatorType methods out of AggregateFunction interface, and allow them to be defined as contracted methods for UDAGG; 
    3) refactoring the built-in aggregates accordingly.
    4) fixed a build warning in flink/table/api/Types.scala (unrelated to FLINK-6361)
    
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shaoxuan-wang/flink F6361-submit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3762.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3762
    
----
commit b5806b1a3975186d69a76fd369ff77cf06e1e67f
Author: shaoxuan-wang <ws...@gmail.com>
Date:   2017-04-24T16:28:37Z

    [FLINK-6361] [table] Refactoring the AggregateFunction interface and built-in aggregates

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3762#discussion_r113072758
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1232,12 +1266,20 @@ object AggregateUtil {
       }
     
       private def createAccumulatorType(
    -      aggregates: Array[TableAggregateFunction[_]]): Seq[TypeInformation[_]] = {
    +      aggregates: Array[TableAggregateFunction[_,_]]): Seq[TypeInformation[_]] = {
     
         val aggTypes: Seq[TypeInformation[_]] =
           aggregates.map {
             agg =>
    -          val accType = agg.getAccumulatorType
    +          var accType: TypeInformation[_] = null
    --- End diff --
    
    change to
    ```
    val accType = try {
      val method: Method = agg.getClass.getMethod("getAccumulatorType")
      method.invoke(agg).asInstanceOf[TypeInformation[_]]
    } catch {
      case _: NoSuchMethodException => null
      case ite: Throwable => throw new TableException("Unexpected exception:", ite)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3762#discussion_r113065861
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -471,31 +480,36 @@ class CodeGenerator(
               j"""
                  |    ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
                  |    ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField(${mapping(i)});
    -             |    accList$i.set(0, aAcc$i);
    -             |    accList$i.set(1, bAcc$i);
    -             |    a.setField(
    -             |      $i,
    -             |      ${aggs(i)}.merge(accList$i));
    +             |    accList$i.set(0, bAcc$i);
    +             |    ${aggs(i)}.merge(aAcc$i, accList$i);
    --- End diff --
    
    `ArrayList` creates a new `Iterator` object everytime `ArrayList.iterator()` is call. We might want to implement our own single element `Iterable` to avoid that overhead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3762#discussion_r113072949
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1281,7 +1323,7 @@ object AggregateUtil {
       }
     
       private[flink] def createAccumulatorRowType(
    -      aggregates: Array[TableAggregateFunction[_]]): RowTypeInfo = {
    +      aggregates: Array[TableAggregateFunction[_,_]]): RowTypeInfo = {
    --- End diff --
    
    +space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3762#discussion_r113072931
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1259,7 +1301,7 @@ object AggregateUtil {
     
       private def createDataSetAggregateBufferDataType(
           groupings: Array[Int],
    -      aggregates: Array[TableAggregateFunction[_]],
    +      aggregates: Array[TableAggregateFunction[_,_]],
    --- End diff --
    
    +space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3762


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3762#discussion_r113066365
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ---
    @@ -17,36 +17,93 @@
      */
     package org.apache.flink.table.functions
     
    -import java.util.{List => JList}
    -
    -import org.apache.flink.api.common.typeinfo.TypeInformation
    -import org.apache.flink.table.api.TableException
    -
     /**
       * Base class for User-Defined Aggregates.
       *
    -  * @tparam T the type of the aggregation result
    +  * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
    +  * methods. An [[AggregateFunction]] needs at least three methods: createAccumulator, getValue,
    --- End diff --
    
    make this a list. Scala style supports markdown syntax:
    ```
    ... needs at least three methods: 
     - createAccumulator, 
     - accumulate, and
     - getValue.
    
    There are a few other methods that can be optional to have: 
     - retract, 
     - merge,
     - resetAccumulator, and 
     - getAccumulatorType.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3762: [FLINK-6361] [table] Refactoring the AggregateFunction in...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3762
  
    Thanks, will merge then


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3762#discussion_r113067259
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala ---
    @@ -17,36 +17,93 @@
      */
     package org.apache.flink.table.functions
     
    -import java.util.{List => JList}
    -
    -import org.apache.flink.api.common.typeinfo.TypeInformation
    -import org.apache.flink.table.api.TableException
    -
     /**
       * Base class for User-Defined Aggregates.
       *
    -  * @tparam T the type of the aggregation result
    +  * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
    +  * methods. An [[AggregateFunction]] needs at least three methods: createAccumulator, getValue,
    +  * and accumulate. There are a few other methods that can be optional to have: retract, merge,
    +  * resetAccumulator, and getAccumulatorType.
    +  *
    +  * All these methods muse be declared publicly, not static and named exactly as the names
    +  * mentioned above. The methods createAccumulator and getValue are defined in the
    +  * [[AggregateFunction]] functions, while other methods are explained below.
    +  *
    +  *
    +  * {{{
    +  * Processes the input values and update the provided accumulator instance. The method
    +  * accumulate can be overloaded with different custom types and arguments. This function is
    --- End diff --
    
    "This function is always a MUST to have." -> "An AggregateFunction requires at least one accumulate method"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3762#discussion_r113073730
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala ---
    @@ -51,7 +51,7 @@ class BoundedProcessingOverRangeProcessFunctionTest {
     
         val aggregates =
           Array(new LongMinWithRetractAggFunction,
    -            new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_]]]
    +            new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_,_]]]
    --- End diff --
    
    +space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3762: [FLINK-6361] [table] Refactoring the AggregateFunction in...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3762
  
    Hi @shaoxuan-wang, I made another pass over the PR and think it looks good.
    I can address the changes I suggested and also add a `SingleElementIterable` instead of using `ArrayList` and finally merge the PR.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3762: [FLINK-6361] [table] Refactoring the AggregateFunction in...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3762
  
    Hi @fhueske , it is good to have a dedicated Iterable for pair-merge. Please go ahead. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---