You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shaoxuan-wang <gi...@git.apache.org> on 2017/02/27 11:15:55 UTC

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

GitHub user shaoxuan-wang opened a pull request:

    https://github.com/apache/flink/pull/3423

    [FLINK-5768] [table] Apply new aggregation functions for datastream and dataset tables

    This PR includes the following changes:
    1. Change the implementation of the DataStream aggregation runtime code to use new aggregation functions (FLINK5767) and aggregate dataStream API (FLINK5582).
    2. DataStream will be running always in incremental mode, as explained in FLINK5564 on 06/Feb/2017.
    3. Change the implementation of the Dataset aggregation runtime code to use new aggregation functions.
    4. Clean up unused class and method.
    
    "mvn verify" has passed under flink-libraries/flink-table
    
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [X] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [X] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shaoxuan-wang/flink F5768-submit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3423.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3423
    
----
commit 04254e484f6cad4fb722ddcd72dbbd70b1406ce3
Author: shaoxuan-wang <ws...@gmail.com>
Date:   2017-02-27T11:09:30Z

    [FLINK-5768] [table] Apply new aggregation functions for datastream and dataset tables

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103435235
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    --- End diff --
    
    I think we could simplify the interface a bit:
    
    - aggregateMapping could become an `aggOutFields: Array[Int]` which contains the position of each aggregate in the output (`aggOutFields[2]` would give the output position of the third agg function).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103733512
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala ---
    @@ -73,12 +73,25 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
           new JArrayList[Accumulator]()
    --- End diff --
    
    move out of `combine()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103732627
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala ---
    @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
           new JArrayList[Accumulator]()
         }
     
    +    var count:Int = 0
         while (iterator.hasNext) {
           val record = iterator.next()
    +      count += 1
    --- End diff --
    
    Set `count = 0` after clearing the accumulator list in line 119


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103674711
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val groupKeysIndex: Array[Int],
    --- End diff --
    
    Yes, I agree with you we can move the groupkeys and agg/group mappings to WindowFunction. Since this PR is dedicated to change aggregate API and functions, and we want to merge this asap such that it won't block other aggregate related PR to be merged. Can we refactor this later?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3423: [FLINK-5768] [table] Apply new aggregation functions for ...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3423
  
    I think we could further remove AggregateMapFunction and DataSetWindowAggregateMapFunction for windowedDataSet and DataSet, and let reduce function directly accumulator the input values. But since this PR is already very huge in terms of lines of changes, I prefer to optimize it in a separate PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103806696
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -737,101 +632,121 @@ object AggregateUtil {
               aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMinAggregate
    +                new ByteMinAggFunction
                   case SMALLINT =>
    -                new ShortMinAggregate
    +                new ShortMinAggFunction
                   case INTEGER =>
    -                new IntMinAggregate
    +                new IntMinAggFunction
                   case BIGINT =>
    -                new LongMinAggregate
    +                new LongMinAggFunction
                   case FLOAT =>
    -                new FloatMinAggregate
    +                new FloatMinAggFunction
                   case DOUBLE =>
    -                new DoubleMinAggregate
    +                new DoubleMinAggFunction
                   case DECIMAL =>
    -                new DecimalMinAggregate
    +                new DecimalMinAggFunction
                   case BOOLEAN =>
    -                new BooleanMinAggregate
    +                new BooleanMinAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Min aggregate does no support type:" + sqlType)
                 }
               } else {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMaxAggregate
    +                new ByteMaxAggFunction
                   case SMALLINT =>
    -                new ShortMaxAggregate
    +                new ShortMaxAggFunction
                   case INTEGER =>
    -                new IntMaxAggregate
    +                new IntMaxAggFunction
                   case BIGINT =>
    -                new LongMaxAggregate
    +                new LongMaxAggFunction
                   case FLOAT =>
    -                new FloatMaxAggregate
    +                new FloatMaxAggFunction
                   case DOUBLE =>
    -                new DoubleMaxAggregate
    +                new DoubleMaxAggFunction
                   case DECIMAL =>
    -                new DecimalMaxAggregate
    +                new DecimalMaxAggFunction
                   case BOOLEAN =>
    -                new BooleanMaxAggregate
    +                new BooleanMaxAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Max aggregate does no support type:" + sqlType)
                 }
               }
             }
             case _: SqlCountAggFunction =>
    -          aggregates(index) = new CountAggregate
    +          aggregates(index) = new CountAggFunction
             case unSupported: SqlAggFunction =>
               throw new TableException("unsupported Function: " + unSupported.getName)
           }
    -      setAggregateDataOffset(index)
    -    }
    -
    -    // set the aggregate intermediate data start index in Row, and update current value.
    -    def setAggregateDataOffset(index: Int): Unit = {
    -      aggregates(index).setAggOffsetInRow(aggOffset)
    -      aggOffset += aggregates(index).intermediateDataType.length
         }
     
         (aggFieldIndexes, aggregates)
       }
     
    -  private def createAggregateBufferDataType(
    -    groupings: Array[Int],
    -    aggregates: Array[Aggregate[_]],
    -    inputType: RelDataType,
    -    windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
    +  private def createDataSetAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[TableAggregateFunction[_]],
    +      inputType: RelDataType,
    +      windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
     
         // get the field data types of group keys.
    -    val groupingTypes: Seq[TypeInformation[_]] = groupings
    -      .map(inputType.getFieldList.get(_).getType)
    -      .map(FlinkTypeFactory.toTypeInfo)
    +    val groupingTypes: Seq[TypeInformation[_]] =
    +      groupings
    +        .map(inputType.getFieldList.get(_).getType)
    +        .map(FlinkTypeFactory.toTypeInfo)
     
         // get all field data types of all intermediate aggregates
    -    val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
    +    val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
    +      val clazz: Class[_] = agg.getClass
    +      TypeInformation.of(clazz)
    +    }
     
         // concat group key types, aggregation types, and window key types
    -    val allFieldTypes:Seq[TypeInformation[_]] = windowKeyTypes match {
    +    val allFieldTypes: Seq[TypeInformation[_]] = windowKeyTypes match {
           case None => groupingTypes ++: aggTypes
           case _ => groupingTypes ++: aggTypes ++: windowKeyTypes.get
         }
    -    new RowTypeInfo(allFieldTypes :_*)
    +    new RowTypeInfo(allFieldTypes: _*)
    +  }
    +
    +  private def createAccumulatorRowType(
    +      inputType: RelDataType,
    +      groupings: Array[Int],
    +      aggregates: Array[TableAggregateFunction[_]]): RowTypeInfo = {
    +
    +    // get the field data types of group keys.
    +    val groupingTypes: Seq[TypeInformation[_]] =
    +      groupings
    +        .map(inputType.getFieldList.get(_).getType)
    +        .map(FlinkTypeFactory.toTypeInfo)
    +
    +    val aggTypes: Seq[TypeInformation[_]] =
    +      aggregates.map {
    +        agg =>
    +          val clazz: Class[_] = agg.getClass
    +          TypeInformation.of(clazz)
    --- End diff --
    
    Same as above. We need the type of the accumulator, not of the aggregate function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103732265
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala ---
    @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
           new JArrayList[Accumulator]()
    --- End diff --
    
    Move out of `reduce()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103835523
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -54,31 +58,31 @@ object AggregateUtil {
         * organized by the following format:
         *
         * {{{
    -    *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    -    *                             |                          |
    -    *                             v                          v
    -    *        +---------+---------+--------+--------+--------+--------+
    -    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    -    *        +---------+---------+--------+--------+--------+--------+
    +    *                          avg(x)                             count(z)
    +    *                             |                                   |
    +    *                             v                                   v
    +    *        +---------+---------+-----------------+------------------+------------------+
    +    *        |groupKey1|groupKey2|  AvgAccumulator |  SumAccumulator  | CountAccumulator |
    +    *        +---------+---------+-----------------+------------------+------------------+
         *                                              ^
         *                                              |
    -    *                               sum(y) aggOffsetInRow = 4
    +    *                                           sum(y)
         * }}}
         *
         */
       private[flink] def createPrepareMapFunction(
           namedAggregates: Seq[CalcitePair[AggregateCall, String]],
           groupings: Array[Int],
           inputType: RelDataType)
    -    : MapFunction[Row, Row] = {
    --- End diff --
    
    Ok, based on the DEV discussion, I thought we are going to fix the code style in the new PRs. But you are right, I should just keep the style changes in the new lines. I will discard the "style only" change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103476693
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggregateMapFunction.scala ---
    @@ -34,13 +35,13 @@ import org.apache.flink.util.Preconditions
       * append an (aligned) rowtime field to the end of the output row.
       */
     class DataSetWindowAggregateMapFunction(
    --- End diff --
    
    We could remove this method if we rework the subsequent combine and groupReduce functions. This would allow use to use `accumulate` in `combine()` or non-combined `groupReduce()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103435665
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val groupKeysIndex: Array[Int],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val finalRowArity: Int)
    --- End diff --
    
    I think we do not need this parameter. the result row should have `aggregates.length` fields. All other fields (groupKeys, window properties) can be added in the `WindowFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103883798
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -737,101 +632,121 @@ object AggregateUtil {
               aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMinAggregate
    +                new ByteMinAggFunction
                   case SMALLINT =>
    -                new ShortMinAggregate
    +                new ShortMinAggFunction
                   case INTEGER =>
    -                new IntMinAggregate
    +                new IntMinAggFunction
                   case BIGINT =>
    -                new LongMinAggregate
    +                new LongMinAggFunction
                   case FLOAT =>
    -                new FloatMinAggregate
    +                new FloatMinAggFunction
                   case DOUBLE =>
    -                new DoubleMinAggregate
    +                new DoubleMinAggFunction
                   case DECIMAL =>
    -                new DecimalMinAggregate
    +                new DecimalMinAggFunction
                   case BOOLEAN =>
    -                new BooleanMinAggregate
    +                new BooleanMinAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Min aggregate does no support type:" + sqlType)
                 }
               } else {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMaxAggregate
    +                new ByteMaxAggFunction
                   case SMALLINT =>
    -                new ShortMaxAggregate
    +                new ShortMaxAggFunction
                   case INTEGER =>
    -                new IntMaxAggregate
    +                new IntMaxAggFunction
                   case BIGINT =>
    -                new LongMaxAggregate
    +                new LongMaxAggFunction
                   case FLOAT =>
    -                new FloatMaxAggregate
    +                new FloatMaxAggFunction
                   case DOUBLE =>
    -                new DoubleMaxAggregate
    +                new DoubleMaxAggFunction
                   case DECIMAL =>
    -                new DecimalMaxAggregate
    +                new DecimalMaxAggFunction
                   case BOOLEAN =>
    -                new BooleanMaxAggregate
    +                new BooleanMaxAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Max aggregate does no support type:" + sqlType)
                 }
               }
             }
             case _: SqlCountAggFunction =>
    -          aggregates(index) = new CountAggregate
    +          aggregates(index) = new CountAggFunction
             case unSupported: SqlAggFunction =>
               throw new TableException("unsupported Function: " + unSupported.getName)
           }
    -      setAggregateDataOffset(index)
    -    }
    -
    -    // set the aggregate intermediate data start index in Row, and update current value.
    -    def setAggregateDataOffset(index: Int): Unit = {
    -      aggregates(index).setAggOffsetInRow(aggOffset)
    -      aggOffset += aggregates(index).intermediateDataType.length
         }
     
         (aggFieldIndexes, aggregates)
       }
     
    -  private def createAggregateBufferDataType(
    -    groupings: Array[Int],
    -    aggregates: Array[Aggregate[_]],
    -    inputType: RelDataType,
    -    windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
    +  private def createDataSetAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[TableAggregateFunction[_]],
    +      inputType: RelDataType,
    +      windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
     
         // get the field data types of group keys.
    -    val groupingTypes: Seq[TypeInformation[_]] = groupings
    -      .map(inputType.getFieldList.get(_).getType)
    -      .map(FlinkTypeFactory.toTypeInfo)
    +    val groupingTypes: Seq[TypeInformation[_]] =
    +      groupings
    +        .map(inputType.getFieldList.get(_).getType)
    +        .map(FlinkTypeFactory.toTypeInfo)
     
         // get all field data types of all intermediate aggregates
    -    val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
    +    val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
    +      val clazz: Class[_] = agg.getClass
    --- End diff --
    
    Thanks for spending time to look into this. I played around your approach, it works well except for the class with Template. I think it is fair to just return a Generic type for the accumulator with Template. Though it is not very efficient, but it is the user's call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103733543
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala ---
    @@ -73,12 +73,25 @@ class DataSetTumbleTimeWindowAggReduceCombineFunction(
           new JArrayList[Accumulator]()
         }
     
    -    // per each aggregator, collect its accumulators to a list
    +    var count:Int = 0
    --- End diff --
    
    space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103469278
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala ---
    @@ -19,61 +19,71 @@
     package org.apache.flink.table.runtime.aggregate
     
     import java.lang.Iterable
    +import java.util.{ArrayList => JArrayList}
     
     import org.apache.flink.api.common.functions.CombineFunction
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
     import org.apache.flink.types.Row
     
    -import scala.collection.JavaConversions._
    -
     /**
    - * It wraps the aggregate logic inside of
    - * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    - * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
    - *
    - * @param aggregates          The aggregate functions.
    - * @param groupKeysMapping    The index mapping of group keys between intermediate aggregate Row
    - *                            and output Row.
    - * @param aggregateMapping    The index mapping between aggregate function list and aggregated value
    - *                            index in output Row.
    - * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
    - *                            Row and output Row.
    - */
    +  * It wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
    +  * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
    +  *
    +  * @param aggregates          The aggregate functions.
    +  * @param groupKeysMapping    The index mapping of group keys between intermediate aggregate Row
    +  *                            and output Row.
    +  * @param aggregateMapping    The index mapping between aggregate function list and aggregated
    +  *                            value
    +  *                            index in output Row.
    +  * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
    +  *                            Row and output Row.
    +  * @param finalRowArity       the arity of the final resulting row
    +  */
     class AggregateReduceCombineFunction(
    -    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val aggregates: Array[AggregateFunction[_ <: Any]],
         private val groupKeysMapping: Array[(Int, Int)],
         private val aggregateMapping: Array[(Int, Int)],
         private val groupingSetsMapping: Array[(Int, Int)],
    -    private val intermediateRowArity: Int,
         private val finalRowArity: Int)
       extends AggregateReduceGroupFunction(
         aggregates,
         groupKeysMapping,
         aggregateMapping,
         groupingSetsMapping,
    -    intermediateRowArity,
    -    finalRowArity)
    -  with CombineFunction[Row, Row] {
    +    finalRowArity) with CombineFunction[Row, Row] {
     
       /**
    -   * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    -   *
    -   * @param records  Sub-grouped intermediate aggregate Rows iterator.
    -   * @return Combined intermediate aggregate Row.
    -   *
    -   */
    +    * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +    *
    +    * @param records Sub-grouped intermediate aggregate Rows iterator.
    +    * @return Combined intermediate aggregate Row.
    +    *
    +    */
       override def combine(records: Iterable[Row]): Row = {
     
    -    // Initiate intermediate aggregate value.
    -    aggregates.foreach(_.initiate(aggregateBuffer))
    -
    -    // Merge intermediate aggregate value to buffer.
    +    // merge intermediate aggregate value to buffer.
         var last: Row = null
    -    records.foreach((record) => {
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +    val iterator = records.iterator()
    +    val accumulatorList = Array.fill(aggregates.length) {
    +      new JArrayList[Accumulator]()
    +    }
    +
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      for (i <- aggregates.indices) {
    +        accumulatorList(i).add(
    --- End diff --
    
    Same as for `reduce()`. We cannot materialize the whole group but must merge the accumulators pairwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103728943
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -80,11 +81,25 @@ class AggregateReduceGroupFunction(
           new JArrayList[Accumulator]()
    --- End diff --
    
    Move out of the `reduce` function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103475581
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala ---
    @@ -63,27 +65,41 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
     
         val iterator = records.iterator()
     
    +    val accumulatorList = Array.fill(aggregates.length) {
    +      new JArrayList[Accumulator]()
    +    }
    +
         while (iterator.hasNext) {
           val record = iterator.next()
    +
           if (count == 0) {
    -        // initiate intermediate aggregate value.
    -        aggregates.foreach(_.initiate(aggregateBuffer))
    +        // clear the accumulator list for all aggregate
    +        for (i <- aggregates.indices) {
    +          accumulatorList(i).clear()
    +        }
           }
    -      // merge intermediate aggregate value to buffer.
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
     
    +      // collect the accumulators for each aggregate
    +      for (i <- aggregates.indices) {
    +        accumulatorList(i).add(record.getField(accumStartPos + i).asInstanceOf[Accumulator])
    --- End diff --
    
    pairwise merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103469070
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -18,32 +18,33 @@
     package org.apache.flink.table.runtime.aggregate
     
     import java.lang.Iterable
    +import java.util.{ArrayList => JArrayList}
     
     import org.apache.flink.api.common.functions.RichGroupReduceFunction
     import org.apache.flink.configuration.Configuration
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
     import org.apache.flink.types.Row
     import org.apache.flink.util.{Collector, Preconditions}
     
    -import scala.collection.JavaConversions._
    -
     /**
    - * It wraps the aggregate logic inside of
    - * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    - *
    - * @param aggregates          The aggregate functions.
    - * @param groupKeysMapping    The index mapping of group keys between intermediate aggregate Row
    - *                            and output Row.
    - * @param aggregateMapping    The index mapping between aggregate function list and aggregated value
    - *                            index in output Row.
    - * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
    - *                            Row and output Row.
    - */
    +  * It wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    +  *
    +  * @param aggregates          The aggregate functions.
    +  * @param groupKeysMapping    The index mapping of group keys between intermediate aggregate Row
    +  *                            and output Row.
    +  * @param aggregateMapping    The index mapping between aggregate function list and aggregated
    +  *                            value
    +  *                            index in output Row.
    +  * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
    +  *                            Row and output Row.
    +  * @param finalRowArity       The arity of the final resulting row
    +  */
     class AggregateReduceGroupFunction(
    -    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val aggregates: Array[AggregateFunction[_ <: Any]],
         private val groupKeysMapping: Array[(Int, Int)],
    --- End diff --
    
    The group key positions in the input are known, right?
    So we can also use an `Array[Int]` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103436838
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val groupKeysIndex: Array[Int],
    --- End diff --
    
    I think we do not need to include the `groupKeys` here. They can be added in the `WindowFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103807487
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -54,31 +58,31 @@ object AggregateUtil {
         * organized by the following format:
         *
         * {{{
    -    *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    -    *                             |                          |
    -    *                             v                          v
    -    *        +---------+---------+--------+--------+--------+--------+
    -    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    -    *        +---------+---------+--------+--------+--------+--------+
    +    *                          avg(x)                             count(z)
    +    *                             |                                   |
    +    *                             v                                   v
    +    *        +---------+---------+-----------------+------------------+------------------+
    +    *        |groupKey1|groupKey2|  AvgAccumulator |  SumAccumulator  | CountAccumulator |
    +    *        +---------+---------+-----------------+------------------+------------------+
         *                                              ^
         *                                              |
    -    *                               sum(y) aggOffsetInRow = 4
    +    *                                           sum(y)
         * }}}
         *
         */
       private[flink] def createPrepareMapFunction(
           namedAggregates: Seq[CalcitePair[AggregateCall, String]],
           groupings: Array[Int],
           inputType: RelDataType)
    -    : MapFunction[Row, Row] = {
    --- End diff --
    
    There are many reformatting changes in this PR. This makes the diffs hard to read. 
    
    Unfortunately, we do not have a fixed code style so we keep changing the formatting back and forth. IMO, the best approach is to not reformat large parts of a file because it hides the relevant changes and somebody might just change it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103733986
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---
    @@ -74,22 +75,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
           new JArrayList[Accumulator]()
         }
     
    -    // per each aggregator, collect its accumulators to a list
    +    var count:Int = 0
    --- End diff --
    
    space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103472747
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction(
       override def open(config: Configuration) {
         Preconditions.checkNotNull(aggregates)
         Preconditions.checkNotNull(groupKeysMapping)
    -    aggregateBuffer = new Row(intermediateRowArity)
    +    aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length)
         output = new Row(finalRowArity)
         if (!groupingSetsMapping.isEmpty) {
           intermediateGroupKeys = Some(groupKeysMapping.map(_._1))
         }
       }
     
       /**
    -   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    -   * calculate aggregated values output by aggregate buffer, and set them into output 
    -   * Row based on the mapping relation between intermediate aggregate data and output data.
    -   *
    -   * @param records  Grouped intermediate aggregate Rows iterator.
    -   * @param out The collector to hand results to.
    -   *
    -   */
    +    * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +    * calculate aggregated values output by aggregate buffer, and set them into output
    +    * Row based on the mapping relation between intermediate aggregate data and output data.
    +    *
    +    * @param records Grouped intermediate aggregate Rows iterator.
    +    * @param out     The collector to hand results to.
    +    *
    +    */
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
    -    // Initiate intermediate aggregate value.
    -    aggregates.foreach(_.initiate(aggregateBuffer))
    -
    -    // Merge intermediate aggregate value to buffer.
    +    // merge intermediate aggregate value to buffer.
         var last: Row = null
    -    records.foreach((record) => {
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +    val iterator = records.iterator()
    +    val accumulatorList = Array.fill(aggregates.length) {
    +      new JArrayList[Accumulator]()
    +    }
    +
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      for (i <- aggregates.indices) {
    +        accumulatorList(i).add(
    --- End diff --
    
    I'm wondering whether it would be better to have no preparing mapper and instead have two separate path for combineable and non-combineable.
    - Combinable: `input -> groupCombine() -> groupReduce()` where `groupCombine` uses `accumulate()` and `groupReduce` pairwise merges.
    - Non-Combinable: `input -> groupReduce()` where `groupReduce` uses `accumulate()`
    
    The combinable `groupCombine` and non-combinable `groupReduce` might even use the same code (except that `groupReduce` needs to call `getValue()` in the end).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103477827
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala ---
    @@ -55,10 +47,10 @@ class IncrementalAggregateTimeWindowFunction(
       }
     
       override def apply(
    -    key: Tuple,
    -    window: TimeWindow,
    -    records: Iterable[Row],
    -    out: Collector[Row]): Unit = {
    +      key: Tuple,
    --- End diff --
    
    If we omit the grouping keys in the  `AggregateAggFunction` we can get them from the `Tuple`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103730749
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala ---
    @@ -76,8 +75,10 @@ class DataSetSessionWindowAggregateCombineGroupFunction(
           new JArrayList[Accumulator]()
    --- End diff --
    
    Move out of `combine()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103727403
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala ---
    @@ -69,11 +69,25 @@ class AggregateReduceCombineFunction(
           new JArrayList[Accumulator]()
    --- End diff --
    
    Move the array and the ArrayLists out of the `combine()` method and clean it at the beginning of `combine()` instead of creating new objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103674994
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val groupKeysIndex: Array[Int],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val finalRowArity: Int)
    +  extends ApiAggFunction[Row, Row, Row] {
    +
    +  override def createAccumulator(): Row = {
    +    val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
    +
    +    for (i <- aggregates.indices) {
    +      accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator())
    +    }
    +    accumulatorRow
    +  }
    +
    +  override def add(value: Row, accumulatorRow: Row) = {
    +    for (i <- groupKeysIndex.indices) {
    +      accumulatorRow.setField(i, value.getField(groupKeysIndex(i)))
    +    }
    +
    +    for (i <- aggregates.indices) {
    +      val accumulator =
    +        accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator]
    +      val v = value.getField(aggFieldsIndex(i))
    +      aggregates(i).accumulate(accumulator, v)
    +    }
    +  }
    +
    +  override def getResult(accumulatorRow: Row): Row = {
    +    val output = new Row(finalRowArity)
    --- End diff --
    
    It will be exact the same as aggregates.length when we remove groupkeys from accumulator state. let us refactor this later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103468984
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -18,32 +18,33 @@
     package org.apache.flink.table.runtime.aggregate
     
     import java.lang.Iterable
    +import java.util.{ArrayList => JArrayList}
     
     import org.apache.flink.api.common.functions.RichGroupReduceFunction
     import org.apache.flink.configuration.Configuration
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
     import org.apache.flink.types.Row
     import org.apache.flink.util.{Collector, Preconditions}
     
    -import scala.collection.JavaConversions._
    -
     /**
    - * It wraps the aggregate logic inside of
    - * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    - *
    - * @param aggregates          The aggregate functions.
    - * @param groupKeysMapping    The index mapping of group keys between intermediate aggregate Row
    - *                            and output Row.
    - * @param aggregateMapping    The index mapping between aggregate function list and aggregated value
    - *                            index in output Row.
    - * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
    - *                            Row and output Row.
    - */
    +  * It wraps the aggregate logic inside of
    +  * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    +  *
    +  * @param aggregates          The aggregate functions.
    +  * @param groupKeysMapping    The index mapping of group keys between intermediate aggregate Row
    +  *                            and output Row.
    +  * @param aggregateMapping    The index mapping between aggregate function list and aggregated
    +  *                            value
    +  *                            index in output Row.
    +  * @param groupingSetsMapping The index mapping of keys in grouping sets between intermediate
    +  *                            Row and output Row.
    +  * @param finalRowArity       The arity of the final resulting row
    +  */
     class AggregateReduceGroupFunction(
    -    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val aggregates: Array[AggregateFunction[_ <: Any]],
         private val groupKeysMapping: Array[(Int, Int)],
         private val aggregateMapping: Array[(Int, Int)],
    --- End diff --
    
    The positions of the aggregates in the input are known, right?
    So we can also use an `Array[Int]` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3423: [FLINK-5768] [table] Apply new aggregation functions for ...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3423
  
    Thanks @fhueske , I updated the PR which has limited the batch merge to 16 rows, please take a look. This PR is primarily dedicated to change the dataStream and DataSet to use new aggregate Interface. I noted down all your other value suggestions, I will definitely address them in the next PRs for UDAGG. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103434035
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala ---
    @@ -119,110 +119,54 @@ class DataStreamAggregate(
           s"select: ($aggString)"
         val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
     
    -    val mapFunction = AggregateUtil.createPrepareMapFunction(
    -      namedAggregates,
    -      grouping,
    -      inputType)
    -
    -    val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
    -
    -
    -    // check whether all aggregates support partial aggregate
    -    if (AggregateUtil.doAllSupportPartialAggregation(
    -          namedAggregates.map(_.getKey),
    -          inputType,
    -          grouping.length)) {
    -      // do Incremental Aggregation
    -      val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        getRowType,
    -        grouping)
    -      // grouped / keyed aggregation
    -      if (groupingKeys.length > 0) {
    -        val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
    -          window,
    -          namedAggregates,
    -          inputType,
    -          rowRelDataType,
    -          grouping,
    -          namedProperties)
    +    // grouped / keyed aggregation
    +    if (groupingKeys.length > 0) {
    +      val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
    +        window,
    +        rowRelDataType.getFieldCount,
    +        namedProperties)
     
    -        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
    -        val windowedStream =
    -          createKeyedWindowedStream(window, keyedStream)
    +      val keyedStream = inputDS.keyBy(groupingKeys: _*)
    +      val windowedStream =
    +        createKeyedWindowedStream(window, keyedStream)
               .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
     
    -        windowedStream
    -        .reduce(reduceFunction, windowFunction)
    -        .returns(rowTypeInfo)
    -        .name(keyedAggOpName)
    -      }
    -      // global / non-keyed aggregation
    -      else {
    -        val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
    -          window,
    +      val (aggFunction, accumulatorRowType) =
    +        AggregateUtil.createDataStreamAggregateFunction(
               namedAggregates,
               inputType,
               rowRelDataType,
    -          grouping,
    -          namedProperties)
    -
    -        val windowedStream =
    -          createNonKeyedWindowedStream(window, mappedInput)
    -          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
    +          grouping)
     
    -        windowedStream
    -        .reduce(reduceFunction, windowFunction)
    -        .returns(rowTypeInfo)
    -        .name(nonKeyedAggOpName)
    -      }
    +      windowedStream
    +        .aggregate(aggFunction, windowFunction, accumulatorRowType, rowTypeInfo, rowTypeInfo)
    +        .name(keyedAggOpName)
         }
    +    // global / non-keyed aggregation
         else {
    -      // do non-Incremental Aggregation
    -      // grouped / keyed aggregation
    -      if (groupingKeys.length > 0) {
    -
    -        val windowFunction = AggregateUtil.createWindowAggregationFunction(
    -          window,
    -          namedAggregates,
    -          inputType,
    -          rowRelDataType,
    -          grouping,
    -          namedProperties)
    +      val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
    --- End diff --
    
    AggregationFunction -> WindowFunction


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103434244
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala ---
    @@ -119,110 +119,54 @@ class DataStreamAggregate(
           s"select: ($aggString)"
         val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
     
    -    val mapFunction = AggregateUtil.createPrepareMapFunction(
    -      namedAggregates,
    -      grouping,
    -      inputType)
    -
    -    val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
    -
    -
    -    // check whether all aggregates support partial aggregate
    -    if (AggregateUtil.doAllSupportPartialAggregation(
    -          namedAggregates.map(_.getKey),
    -          inputType,
    -          grouping.length)) {
    -      // do Incremental Aggregation
    -      val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        getRowType,
    -        grouping)
    -      // grouped / keyed aggregation
    -      if (groupingKeys.length > 0) {
    -        val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
    -          window,
    -          namedAggregates,
    -          inputType,
    -          rowRelDataType,
    -          grouping,
    -          namedProperties)
    +    // grouped / keyed aggregation
    +    if (groupingKeys.length > 0) {
    +      val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
    +        window,
    +        rowRelDataType.getFieldCount,
    +        namedProperties)
     
    -        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
    -        val windowedStream =
    -          createKeyedWindowedStream(window, keyedStream)
    +      val keyedStream = inputDS.keyBy(groupingKeys: _*)
    +      val windowedStream =
    +        createKeyedWindowedStream(window, keyedStream)
               .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
     
    -        windowedStream
    -        .reduce(reduceFunction, windowFunction)
    -        .returns(rowTypeInfo)
    -        .name(keyedAggOpName)
    -      }
    -      // global / non-keyed aggregation
    -      else {
    -        val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
    -          window,
    +      val (aggFunction, accumulatorRowType) =
    +        AggregateUtil.createDataStreamAggregateFunction(
               namedAggregates,
               inputType,
               rowRelDataType,
    -          grouping,
    -          namedProperties)
    -
    -        val windowedStream =
    -          createNonKeyedWindowedStream(window, mappedInput)
    -          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
    +          grouping)
     
    -        windowedStream
    -        .reduce(reduceFunction, windowFunction)
    -        .returns(rowTypeInfo)
    -        .name(nonKeyedAggOpName)
    -      }
    +      windowedStream
    +        .aggregate(aggFunction, windowFunction, accumulatorRowType, rowTypeInfo, rowTypeInfo)
    +        .name(keyedAggOpName)
         }
    +    // global / non-keyed aggregation
         else {
    -      // do non-Incremental Aggregation
    -      // grouped / keyed aggregation
    -      if (groupingKeys.length > 0) {
    -
    -        val windowFunction = AggregateUtil.createWindowAggregationFunction(
    -          window,
    -          namedAggregates,
    -          inputType,
    -          rowRelDataType,
    -          grouping,
    -          namedProperties)
    +      val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
    +        window,
    +        rowRelDataType.getFieldCount,
    +        namedProperties)
     
    -        val keyedStream = mappedInput.keyBy(groupingKeys: _*)
    -        val windowedStream =
    -          createKeyedWindowedStream(window, keyedStream)
    -          .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
    +      val windowedStream =
    +        createNonKeyedWindowedStream(window, inputDS)
    +          .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
     
    -        windowedStream
    -        .apply(windowFunction)
    -        .returns(rowTypeInfo)
    -        .name(keyedAggOpName)
    -      }
    -      // global / non-keyed aggregation
    -      else {
    -        val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
    -          window,
    +      val (aggFunction, accumulatorRowType) =
    +        AggregateUtil.createDataStreamAggregateFunction(
               namedAggregates,
               inputType,
               rowRelDataType,
    -          grouping,
    -          namedProperties)
    +          grouping)
    --- End diff --
    
    Make `grouping` an optional parameter or an Option?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103818719
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val groupKeysIndex: Array[Int],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val finalRowArity: Int)
    +  extends ApiAggFunction[Row, Row, Row] {
    +
    +  override def createAccumulator(): Row = {
    +    val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
    +
    +    for (i <- aggregates.indices) {
    +      accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator())
    +    }
    +    accumulatorRow
    +  }
    +
    +  override def add(value: Row, accumulatorRow: Row) = {
    +    for (i <- groupKeysIndex.indices) {
    --- End diff --
    
    I played a bit around and have a commit that removes the grouping keys from the `AggregateFunction` and sets them in the `WindowFunction`.
    I'll put it on top of the PR before it is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103729009
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -80,11 +81,25 @@ class AggregateReduceGroupFunction(
           new JArrayList[Accumulator]()
         }
     
    +    var count:Int = 0
    --- End diff --
    
    space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3423: [FLINK-5768] [table] Apply new aggregation functions for ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3423
  
    Hi @shaoxuan-wang, sorry. I missed your comment. If you haven't started reworking the batch aggregations yet, I agree to do it later and just change the merging to smaller batches. 
    
    So far, we always put an emphasis on robustness and tried to avoid memory issues as much as possible. In batch, most of the JVM memory is maintained by Flink and not available for regular user-function objects. So I'd suggest to be a bit conservative here and batch 16 rows. We can also run a few benchmarks to check how much the parameter affects the performance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3423: [FLINK-5768] [table] Apply new aggregation functions for ...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3423
  
    @fhueske  thanks for the review. I completely agree with your suggestion on  "reworking the batch design". Actually I have proposed the same idea to "rework the batch" before your review (maybe you have missed my comment on Feb.27). I was hesitating to make the changes, as I want to keep this PR as dedicated as possible. But since the performance of the current design is a concern, let's do the clean up all together within this PR.  Regarding to "pairwise merging", I have different opinion, there are many aggregates that merging a list at once is much more efficient than merge just two. If we always use "pairwise merging" in the runtime, we will lose the advantage of merge(List) API defined in AggregateFunction interface. If the memory is a concern, we can limit the size of List for each merge iteration, I will provide the update very soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103673820
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    --- End diff --
    
    Yes, I think a single Array should be good enough, but I am not quite sure the reason why we have had this initially. I prefer to spend time to investigate it a little before change it. I think we can make this change at the codegen step.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103475723
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceCombineFunction.scala ---
    @@ -28,68 +30,74 @@ import org.apache.flink.types.Row
       * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
       * It is used for tumbling time-window on batch.
       *
    -  * @param rowtimePos The rowtime field index in input row
    -  * @param windowSize Tumbling time window size
    -  * @param windowStartPos The relative window-start field position to the last field of output row
    -  * @param windowEndPos The relative window-end field position to the last field of output row
    -  * @param aggregates The aggregate functions.
    +  * @param windowSize       Tumbling time window size
    +  * @param windowStartPos   The relative window-start field position to the last field of output row
    +  * @param windowEndPos     The relative window-end field position to the last field of output row
    +  * @param aggregates       The aggregate functions.
       * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
       *                         and output Row.
       * @param aggregateMapping The index mapping between aggregate function list and aggregated value
       *                         index in output Row.
    -  * @param intermediateRowArity The intermediate row field count
    -  * @param finalRowArity The output row field count
    +  * @param finalRowArity    The output row field count
       */
     class DataSetTumbleTimeWindowAggReduceCombineFunction(
    -    rowtimePos: Int,
         windowSize: Long,
         windowStartPos: Option[Int],
         windowEndPos: Option[Int],
    -    aggregates: Array[Aggregate[_ <: Any]],
    +    aggregates: Array[AggregateFunction[_ <: Any]],
         groupKeysMapping: Array[(Int, Int)],
         aggregateMapping: Array[(Int, Int)],
    -    intermediateRowArity: Int,
         finalRowArity: Int)
       extends DataSetTumbleTimeWindowAggReduceGroupFunction(
    -    rowtimePos,
         windowSize,
         windowStartPos,
         windowEndPos,
         aggregates,
         groupKeysMapping,
         aggregateMapping,
    -    intermediateRowArity,
         finalRowArity)
    -  with CombineFunction[Row, Row] {
    +    with CombineFunction[Row, Row] {
     
       /**
         * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
         *
    -    * @param records  Sub-grouped intermediate aggregate Rows iterator.
    +    * @param records Sub-grouped intermediate aggregate Rows iterator.
         * @return Combined intermediate aggregate Row.
         *
         */
       override def combine(records: Iterable[Row]): Row = {
     
    -    // initiate intermediate aggregate value.
    -    aggregates.foreach(_.initiate(aggregateBuffer))
    -
    -    // merge intermediate aggregate value to buffer.
         var last: Row = null
    -
         val iterator = records.iterator()
    +    val accumulatorList = Array.fill(aggregates.length) {
    +      new JArrayList[Accumulator]()
    +    }
    +
    +    // per each aggregator, collect its accumulators to a list
         while (iterator.hasNext) {
           val record = iterator.next()
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +      for (i <- aggregates.indices) {
    +        accumulatorList(i).add(
    --- End diff --
    
    pairwise merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103885054
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -737,101 +632,121 @@ object AggregateUtil {
               aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMinAggregate
    +                new ByteMinAggFunction
                   case SMALLINT =>
    -                new ShortMinAggregate
    +                new ShortMinAggFunction
                   case INTEGER =>
    -                new IntMinAggregate
    +                new IntMinAggFunction
                   case BIGINT =>
    -                new LongMinAggregate
    +                new LongMinAggFunction
                   case FLOAT =>
    -                new FloatMinAggregate
    +                new FloatMinAggFunction
                   case DOUBLE =>
    -                new DoubleMinAggregate
    +                new DoubleMinAggFunction
                   case DECIMAL =>
    -                new DecimalMinAggregate
    +                new DecimalMinAggFunction
                   case BOOLEAN =>
    -                new BooleanMinAggregate
    +                new BooleanMinAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Min aggregate does no support type:" + sqlType)
                 }
               } else {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMaxAggregate
    +                new ByteMaxAggFunction
                   case SMALLINT =>
    -                new ShortMaxAggregate
    +                new ShortMaxAggFunction
                   case INTEGER =>
    -                new IntMaxAggregate
    +                new IntMaxAggFunction
                   case BIGINT =>
    -                new LongMaxAggregate
    +                new LongMaxAggFunction
                   case FLOAT =>
    -                new FloatMaxAggregate
    +                new FloatMaxAggFunction
                   case DOUBLE =>
    -                new DoubleMaxAggregate
    +                new DoubleMaxAggFunction
                   case DECIMAL =>
    -                new DecimalMaxAggregate
    +                new DecimalMaxAggFunction
                   case BOOLEAN =>
    -                new BooleanMaxAggregate
    +                new BooleanMaxAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Max aggregate does no support type:" + sqlType)
                 }
               }
             }
             case _: SqlCountAggFunction =>
    -          aggregates(index) = new CountAggregate
    +          aggregates(index) = new CountAggFunction
             case unSupported: SqlAggFunction =>
               throw new TableException("unsupported Function: " + unSupported.getName)
           }
    -      setAggregateDataOffset(index)
    -    }
    -
    -    // set the aggregate intermediate data start index in Row, and update current value.
    -    def setAggregateDataOffset(index: Int): Unit = {
    -      aggregates(index).setAggOffsetInRow(aggOffset)
    -      aggOffset += aggregates(index).intermediateDataType.length
         }
     
         (aggFieldIndexes, aggregates)
       }
     
    -  private def createAggregateBufferDataType(
    -    groupings: Array[Int],
    -    aggregates: Array[Aggregate[_]],
    -    inputType: RelDataType,
    -    windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
    +  private def createDataSetAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[TableAggregateFunction[_]],
    +      inputType: RelDataType,
    +      windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
     
         // get the field data types of group keys.
    -    val groupingTypes: Seq[TypeInformation[_]] = groupings
    -      .map(inputType.getFieldList.get(_).getType)
    -      .map(FlinkTypeFactory.toTypeInfo)
    +    val groupingTypes: Seq[TypeInformation[_]] =
    +      groupings
    +        .map(inputType.getFieldList.get(_).getType)
    +        .map(FlinkTypeFactory.toTypeInfo)
     
         // get all field data types of all intermediate aggregates
    -    val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
    +    val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
    +      val clazz: Class[_] = agg.getClass
    --- End diff --
    
    Btw, @fhueske, I have not gotten a chance to dig into yet. But are you in case already aware of why we cannot get the correct scala type by TypeInformation? If we cannot fix this , I think we should consider to implement all built-in aggregate and aggregateTestBase in Java, and also recommend future UDAGG user to write aggregate in java but not in scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103466494
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction(
       override def open(config: Configuration) {
         Preconditions.checkNotNull(aggregates)
         Preconditions.checkNotNull(groupKeysMapping)
    -    aggregateBuffer = new Row(intermediateRowArity)
    +    aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length)
         output = new Row(finalRowArity)
         if (!groupingSetsMapping.isEmpty) {
           intermediateGroupKeys = Some(groupKeysMapping.map(_._1))
         }
       }
     
       /**
    -   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    -   * calculate aggregated values output by aggregate buffer, and set them into output 
    -   * Row based on the mapping relation between intermediate aggregate data and output data.
    -   *
    -   * @param records  Grouped intermediate aggregate Rows iterator.
    -   * @param out The collector to hand results to.
    -   *
    -   */
    +    * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +    * calculate aggregated values output by aggregate buffer, and set them into output
    +    * Row based on the mapping relation between intermediate aggregate data and output data.
    +    *
    +    * @param records Grouped intermediate aggregate Rows iterator.
    +    * @param out     The collector to hand results to.
    +    *
    +    */
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
    -    // Initiate intermediate aggregate value.
    -    aggregates.foreach(_.initiate(aggregateBuffer))
    -
    -    // Merge intermediate aggregate value to buffer.
    +    // merge intermediate aggregate value to buffer.
         var last: Row = null
    -    records.foreach((record) => {
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +    val iterator = records.iterator()
    +    val accumulatorList = Array.fill(aggregates.length) {
    --- End diff --
    
    Make this a member variable of size 2 for pairwise merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3423


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103475803
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---
    @@ -68,29 +68,42 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
     
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
    -    // initiate intermediate aggregate value.
    -    aggregates.foreach(_.initiate(aggregateBuffer))
    -
    -    // merge intermediate aggregate value to buffer.
         var last: Row = null
    -
         val iterator = records.iterator()
    +    val accumulatorList = Array.fill(aggregates.length) {
    +      new JArrayList[Accumulator]()
    +    }
    +
    +    // per each aggregator, collect its accumulators to a list
         while (iterator.hasNext) {
           val record = iterator.next()
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +      for (i <- aggregates.indices) {
    +        accumulatorList(i).add(record.getField(accumStartPos + i).asInstanceOf[Accumulator])
    --- End diff --
    
    pairwise merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103801735
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -363,199 +342,112 @@ object AggregateUtil {
               groupingOffsetMapping,
               aggOffsetMapping,
               groupingSetsMapping,
    -          intermediateRowArity,
               outputType.getFieldCount)
           }
         groupReduceFunction
       }
     
       /**
    -    * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
    -    * aggregation.
    -    *
    +    * Create an [[AllWindowFunction]] for non-partitioned window aggregates.
         */
    -  private[flink] def createIncrementalAggregateReduceFunction(
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int])
    -    : IncrementalAggregateReduceFunction = {
    -
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    -
    -    val groupingOffsetMapping =
    -      getGroupingOffsetAndAggOffsetMapping(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings)._1
    -
    -    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
    -    val reduceFunction = new IncrementalAggregateReduceFunction(
    -      aggregates,
    -      groupingOffsetMapping,
    -      intermediateRowArity)
    -    reduceFunction
    -  }
    -
    -  /**
    -    * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
    -    */
    -  private[flink] def createAllWindowAggregationFunction(
    +  private[flink] def createAggregationAllWindowFunction(
           window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : AllWindowFunction[Row, Row, DataStreamWindow] = {
    -
    -    val aggFunction =
    -      createAggregateGroupReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings,
    -        inGroupingSet = false)
    +      finalRowArity: Int,
    +      properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
     
         if (isTimeWindow(window)) {
           val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
    -      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
    +      new IncrementalAggregateAllTimeWindowFunction(
    +        startPos,
    +        endPos,
    +        finalRowArity)
    +        .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
         } else {
    -      new AggregateAllWindowFunction(aggFunction)
    +      new IncrementalAggregateAllWindowFunction(
    +        finalRowArity)
         }
       }
     
       /**
    -    * Create a [[WindowFunction]] to compute partitioned group window aggregates.
    -    *
    +    * Create a [[WindowFunction]] for group window aggregates.
         */
    -  private[flink] def createWindowAggregationFunction(
    +  private[flink] def createAggregationGroupWindowFunction(
           window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
    -
    -    val aggFunction =
    -      createAggregateGroupReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings,
    -        inGroupingSet = false)
    +      finalRowArity: Int,
    +      properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
     
         if (isTimeWindow(window)) {
           val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
    -      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
    +      new IncrementalAggregateTimeWindowFunction(
    +        startPos,
    +        endPos,
    +        finalRowArity)
    +        .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
         } else {
    -      new AggregateWindowFunction(aggFunction)
    +      new IncrementalAggregateWindowFunction(
    +        finalRowArity)
         }
       }
     
    -  /**
    -    * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
    -    * window aggregates.
    -    */
    -  private[flink] def createAllWindowIncrementalAggregationFunction(
    -      window: LogicalWindow,
    +  private[flink] def createDataStreamAggregateFunction(
           namedAggregates: Seq[CalcitePair[AggregateCall, String]],
           inputType: RelDataType,
           outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : AllWindowFunction[Row, Row, DataStreamWindow] = {
    +      groupKeysIndex: Array[Int]): (ApiAggregateFunction[Row, Row, Row], RowTypeInfo) = {
     
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    +    val (aggFields, aggregates) =
    +      transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length)
     
    -    val (groupingOffsetMapping, aggOffsetMapping) =
    -      getGroupingOffsetAndAggOffsetMapping(
    -      namedAggregates,
    -      inputType,
    -      outputType,
    -      groupings)
    +    val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex)
     
    -    val finalRowArity = outputType.getFieldCount
    +    val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
     
    -    if (isTimeWindow(window)) {
    -      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new IncrementalAggregateAllTimeWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity,
    -        startPos,
    -        endPos)
    -      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
    -    } else {
    -      new IncrementalAggregateAllWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity)
    +    if (groupKeysMapping.length != groupKeysIndex.length ||
    +      aggregateMapping.length != namedAggregates.length) {
    +      throw new TableException(
    +        "Could not find output field in input data type or aggregate functions.")
         }
    +
    +    val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates)
    +    val aggFunction = new AggregateAggFunction(
    +      aggregates,
    +      aggFields,
    +      aggregateMapping,
    +      groupKeysIndex,
    +      groupKeysMapping,
    +      outputType.getFieldCount)
    +
    +    (aggFunction, accumulatorRowType)
       }
     
       /**
    -    * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
    +    * Return true if all aggregates can be partially merged. False otherwise.
         */
    -  private[flink] def createWindowIncrementalAggregationFunction(
    -      window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +  private[flink] def doAllSupportPartialMerge(
    +      aggregateCalls: Seq[AggregateCall],
           inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
    +      groupKeysCount: Int): Boolean = {
     
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    -
    -    val (groupingOffsetMapping, aggOffsetMapping) =
    -      getGroupingOffsetAndAggOffsetMapping(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings)
    -
    -    val finalRowArity = outputType.getFieldCount
    +    val aggregateList = transformToAggregateFunctions(
    +      aggregateCalls,
    +      inputType,
    +      groupKeysCount)._2
     
    -    if (isTimeWindow(window)) {
    -      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new IncrementalAggregateTimeWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity,
    -        startPos,
    -        endPos)
    -      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
    -    } else {
    -      new IncrementalAggregateWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity)
    -    }
    +    doAllSupportPartialMerge(aggregateList)
       }
     
       /**
    -    * Return true if all aggregates can be partially computed. False otherwise.
    +    * Return true if all aggregates can be partially merged. False otherwise.
         */
    -  private[flink] def doAllSupportPartialAggregation(
    -    aggregateCalls: Seq[AggregateCall],
    -    inputType: RelDataType,
    -    groupKeysCount: Int): Boolean = {
    -    transformToAggregateFunctions(
    -      aggregateCalls,
    -      inputType,
    -      groupKeysCount)._2.forall(_.supportPartial)
    +  private[flink] def doAllSupportPartialMerge(
    +      aggregateList: Array[TableAggregateFunction[_ <: Any]]): Boolean = {
    +    var ret: Boolean = true
    --- End diff --
    
    can be simplified to 
    ```
    aggregateList.forall(ifMethodExitInFunction("merge", _))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103818316
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -737,101 +632,121 @@ object AggregateUtil {
               aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMinAggregate
    +                new ByteMinAggFunction
                   case SMALLINT =>
    -                new ShortMinAggregate
    +                new ShortMinAggFunction
                   case INTEGER =>
    -                new IntMinAggregate
    +                new IntMinAggFunction
                   case BIGINT =>
    -                new LongMinAggregate
    +                new LongMinAggFunction
                   case FLOAT =>
    -                new FloatMinAggregate
    +                new FloatMinAggFunction
                   case DOUBLE =>
    -                new DoubleMinAggregate
    +                new DoubleMinAggFunction
                   case DECIMAL =>
    -                new DecimalMinAggregate
    +                new DecimalMinAggFunction
                   case BOOLEAN =>
    -                new BooleanMinAggregate
    +                new BooleanMinAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Min aggregate does no support type:" + sqlType)
                 }
               } else {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMaxAggregate
    +                new ByteMaxAggFunction
                   case SMALLINT =>
    -                new ShortMaxAggregate
    +                new ShortMaxAggFunction
                   case INTEGER =>
    -                new IntMaxAggregate
    +                new IntMaxAggFunction
                   case BIGINT =>
    -                new LongMaxAggregate
    +                new LongMaxAggFunction
                   case FLOAT =>
    -                new FloatMaxAggregate
    +                new FloatMaxAggFunction
                   case DOUBLE =>
    -                new DoubleMaxAggregate
    +                new DoubleMaxAggFunction
                   case DECIMAL =>
    -                new DecimalMaxAggregate
    +                new DecimalMaxAggFunction
                   case BOOLEAN =>
    -                new BooleanMaxAggregate
    +                new BooleanMaxAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Max aggregate does no support type:" + sqlType)
                 }
               }
             }
             case _: SqlCountAggFunction =>
    -          aggregates(index) = new CountAggregate
    +          aggregates(index) = new CountAggFunction
             case unSupported: SqlAggFunction =>
               throw new TableException("unsupported Function: " + unSupported.getName)
           }
    -      setAggregateDataOffset(index)
    -    }
    -
    -    // set the aggregate intermediate data start index in Row, and update current value.
    -    def setAggregateDataOffset(index: Int): Unit = {
    -      aggregates(index).setAggOffsetInRow(aggOffset)
    -      aggOffset += aggregates(index).intermediateDataType.length
         }
     
         (aggFieldIndexes, aggregates)
       }
     
    -  private def createAggregateBufferDataType(
    -    groupings: Array[Int],
    -    aggregates: Array[Aggregate[_]],
    -    inputType: RelDataType,
    -    windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
    +  private def createDataSetAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[TableAggregateFunction[_]],
    +      inputType: RelDataType,
    +      windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
     
         // get the field data types of group keys.
    -    val groupingTypes: Seq[TypeInformation[_]] = groupings
    -      .map(inputType.getFieldList.get(_).getType)
    -      .map(FlinkTypeFactory.toTypeInfo)
    +    val groupingTypes: Seq[TypeInformation[_]] =
    +      groupings
    +        .map(inputType.getFieldList.get(_).getType)
    +        .map(FlinkTypeFactory.toTypeInfo)
     
         // get all field data types of all intermediate aggregates
    -    val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
    +    val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
    +      val clazz: Class[_] = agg.getClass
    --- End diff --
    
    I played around with this.
    I think we can make it work without a `getAccumulatorType()` method but have to change a few parts.
    First of all, the accumulators need to be moved out of the `AggregationFunction` and become regular "top-level" classes. Once this is done, `TypeInformation.of(clazz)` should detect them as tuples and create a `TupleTypeInfo`. However, the fields inside are still of `GenericType`. I figured out that it helps to use the Java boxed types instead of Scala's types, i.e., `a JTuple1[JLong]` (with `JLong = java.lang.Long) will result in a correct TupleTypeInfo with a single Long field. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103733963
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---
    @@ -74,22 +75,28 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
           new JArrayList[Accumulator]()
    --- End diff --
    
    move out of `reduce()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103433985
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala ---
    @@ -119,110 +119,54 @@ class DataStreamAggregate(
           s"select: ($aggString)"
         val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
     
    -    val mapFunction = AggregateUtil.createPrepareMapFunction(
    -      namedAggregates,
    -      grouping,
    -      inputType)
    -
    -    val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
    -
    -
    -    // check whether all aggregates support partial aggregate
    -    if (AggregateUtil.doAllSupportPartialAggregation(
    -          namedAggregates.map(_.getKey),
    -          inputType,
    -          grouping.length)) {
    -      // do Incremental Aggregation
    -      val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        getRowType,
    -        grouping)
    -      // grouped / keyed aggregation
    -      if (groupingKeys.length > 0) {
    -        val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
    -          window,
    -          namedAggregates,
    -          inputType,
    -          rowRelDataType,
    -          grouping,
    -          namedProperties)
    +    // grouped / keyed aggregation
    +    if (groupingKeys.length > 0) {
    +      val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
    --- End diff --
    
    AggregationFunction -> WindowFunction?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103475198
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala ---
    @@ -91,9 +93,13 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
     
         var windowStart: java.lang.Long = null
         var windowEnd: java.lang.Long = null
    -    var currentRowTime:java.lang.Long  = null
    +    var currentRowTime: java.lang.Long = null
     
         val iterator = records.iterator()
    +    val accumulatorList = Array.fill(aggregates.length) {
    --- End diff --
    
    Use a single `JArrayList(2)` for pairwise merging and make it a member variable


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103801835
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -363,199 +342,112 @@ object AggregateUtil {
               groupingOffsetMapping,
               aggOffsetMapping,
               groupingSetsMapping,
    -          intermediateRowArity,
               outputType.getFieldCount)
           }
         groupReduceFunction
       }
     
       /**
    -    * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
    -    * aggregation.
    -    *
    +    * Create an [[AllWindowFunction]] for non-partitioned window aggregates.
         */
    -  private[flink] def createIncrementalAggregateReduceFunction(
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int])
    -    : IncrementalAggregateReduceFunction = {
    -
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    -
    -    val groupingOffsetMapping =
    -      getGroupingOffsetAndAggOffsetMapping(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings)._1
    -
    -    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
    -    val reduceFunction = new IncrementalAggregateReduceFunction(
    -      aggregates,
    -      groupingOffsetMapping,
    -      intermediateRowArity)
    -    reduceFunction
    -  }
    -
    -  /**
    -    * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
    -    */
    -  private[flink] def createAllWindowAggregationFunction(
    +  private[flink] def createAggregationAllWindowFunction(
           window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : AllWindowFunction[Row, Row, DataStreamWindow] = {
    -
    -    val aggFunction =
    -      createAggregateGroupReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings,
    -        inGroupingSet = false)
    +      finalRowArity: Int,
    +      properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
     
         if (isTimeWindow(window)) {
           val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
    -      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
    +      new IncrementalAggregateAllTimeWindowFunction(
    +        startPos,
    +        endPos,
    +        finalRowArity)
    +        .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
         } else {
    -      new AggregateAllWindowFunction(aggFunction)
    +      new IncrementalAggregateAllWindowFunction(
    +        finalRowArity)
         }
       }
     
       /**
    -    * Create a [[WindowFunction]] to compute partitioned group window aggregates.
    -    *
    +    * Create a [[WindowFunction]] for group window aggregates.
         */
    -  private[flink] def createWindowAggregationFunction(
    +  private[flink] def createAggregationGroupWindowFunction(
           window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
    -
    -    val aggFunction =
    -      createAggregateGroupReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings,
    -        inGroupingSet = false)
    +      finalRowArity: Int,
    +      properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
     
         if (isTimeWindow(window)) {
           val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
    -      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
    +      new IncrementalAggregateTimeWindowFunction(
    +        startPos,
    +        endPos,
    +        finalRowArity)
    +        .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
         } else {
    -      new AggregateWindowFunction(aggFunction)
    +      new IncrementalAggregateWindowFunction(
    +        finalRowArity)
         }
       }
     
    -  /**
    -    * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
    -    * window aggregates.
    -    */
    -  private[flink] def createAllWindowIncrementalAggregationFunction(
    -      window: LogicalWindow,
    +  private[flink] def createDataStreamAggregateFunction(
           namedAggregates: Seq[CalcitePair[AggregateCall, String]],
           inputType: RelDataType,
           outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : AllWindowFunction[Row, Row, DataStreamWindow] = {
    +      groupKeysIndex: Array[Int]): (ApiAggregateFunction[Row, Row, Row], RowTypeInfo) = {
     
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    +    val (aggFields, aggregates) =
    +      transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length)
     
    -    val (groupingOffsetMapping, aggOffsetMapping) =
    -      getGroupingOffsetAndAggOffsetMapping(
    -      namedAggregates,
    -      inputType,
    -      outputType,
    -      groupings)
    +    val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex)
     
    -    val finalRowArity = outputType.getFieldCount
    +    val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
     
    -    if (isTimeWindow(window)) {
    -      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new IncrementalAggregateAllTimeWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity,
    -        startPos,
    -        endPos)
    -      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
    -    } else {
    -      new IncrementalAggregateAllWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity)
    +    if (groupKeysMapping.length != groupKeysIndex.length ||
    +      aggregateMapping.length != namedAggregates.length) {
    +      throw new TableException(
    +        "Could not find output field in input data type or aggregate functions.")
         }
    +
    +    val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates)
    +    val aggFunction = new AggregateAggFunction(
    +      aggregates,
    +      aggFields,
    +      aggregateMapping,
    +      groupKeysIndex,
    +      groupKeysMapping,
    +      outputType.getFieldCount)
    +
    +    (aggFunction, accumulatorRowType)
       }
     
       /**
    -    * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
    +    * Return true if all aggregates can be partially merged. False otherwise.
         */
    -  private[flink] def createWindowIncrementalAggregationFunction(
    -      window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +  private[flink] def doAllSupportPartialMerge(
    +      aggregateCalls: Seq[AggregateCall],
           inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
    +      groupKeysCount: Int): Boolean = {
     
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    -
    -    val (groupingOffsetMapping, aggOffsetMapping) =
    -      getGroupingOffsetAndAggOffsetMapping(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings)
    -
    -    val finalRowArity = outputType.getFieldCount
    +    val aggregateList = transformToAggregateFunctions(
    +      aggregateCalls,
    +      inputType,
    +      groupKeysCount)._2
     
    -    if (isTimeWindow(window)) {
    -      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new IncrementalAggregateTimeWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity,
    -        startPos,
    -        endPos)
    -      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
    -    } else {
    -      new IncrementalAggregateWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity)
    -    }
    +    doAllSupportPartialMerge(aggregateList)
       }
     
       /**
    -    * Return true if all aggregates can be partially computed. False otherwise.
    +    * Return true if all aggregates can be partially merged. False otherwise.
         */
    -  private[flink] def doAllSupportPartialAggregation(
    -    aggregateCalls: Seq[AggregateCall],
    -    inputType: RelDataType,
    -    groupKeysCount: Int): Boolean = {
    -    transformToAggregateFunctions(
    -      aggregateCalls,
    -      inputType,
    -      groupKeysCount)._2.forall(_.supportPartial)
    +  private[flink] def doAllSupportPartialMerge(
    +      aggregateList: Array[TableAggregateFunction[_ <: Any]]): Boolean = {
    +    var ret: Boolean = true
    +    var i: Int = 0
    +    while (i < aggregateList.length && ret) {
    +      ret = ifMethodExitInFunction("merge", aggregateList(i))
    --- End diff --
    
    Last check overrides all previous ones.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103445538
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val groupKeysIndex: Array[Int],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val finalRowArity: Int)
    +  extends ApiAggFunction[Row, Row, Row] {
    +
    +  override def createAccumulator(): Row = {
    +    val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
    +
    +    for (i <- aggregates.indices) {
    +      accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator())
    +    }
    +    accumulatorRow
    +  }
    +
    +  override def add(value: Row, accumulatorRow: Row) = {
    +    for (i <- groupKeysIndex.indices) {
    +      accumulatorRow.setField(i, value.getField(groupKeysIndex(i)))
    +    }
    +
    +    for (i <- aggregates.indices) {
    +      val accumulator =
    +        accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator]
    +      val v = value.getField(aggFieldsIndex(i))
    +      aggregates(i).accumulate(accumulator, v)
    +    }
    +  }
    +
    +  override def getResult(accumulatorRow: Row): Row = {
    +    val output = new Row(finalRowArity)
    +
    +    groupKeysMapping.foreach {
    --- End diff --
    
    skip grouping keys


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103733313
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala ---
    @@ -85,6 +86,16 @@ class DataSetTumbleCountWindowAggReduceGroupFunction(
           }
           count += 1
     
    +      // for every maxMergeLen accumulators, we merge them into one
    +      if (count % maxMergeLen == 0) {
    +        for (i <- aggregates.indices) {
    +          val agg = aggregates(i)
    +          val accumulator = agg.merge(accumulatorList(i))
    --- End diff --
    
    make `accumulatorList` a member of the function and move the initialization out of `reduce()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103734677
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---
    @@ -99,8 +106,8 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction(
         // get final aggregate value and set to output.
         aggregateMapping.foreach {
           case (after, previous) => {
    -        val accumulator =
    -          aggregateBuffer.getField(accumStartPos + previous).asInstanceOf[Accumulator]
    +        val agg = aggregates(previous)
    +        val accumulator = agg.merge(accumulatorList(previous))
             val result = aggregates(previous).getValue(accumulator)
    --- End diff --
    
    `aggregates(previous)` -> `agg`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103839943
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -363,199 +342,112 @@ object AggregateUtil {
               groupingOffsetMapping,
               aggOffsetMapping,
               groupingSetsMapping,
    -          intermediateRowArity,
               outputType.getFieldCount)
           }
         groupReduceFunction
       }
     
       /**
    -    * Create a [[org.apache.flink.api.common.functions.ReduceFunction]] for incremental window
    -    * aggregation.
    -    *
    +    * Create an [[AllWindowFunction]] for non-partitioned window aggregates.
         */
    -  private[flink] def createIncrementalAggregateReduceFunction(
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int])
    -    : IncrementalAggregateReduceFunction = {
    -
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    -
    -    val groupingOffsetMapping =
    -      getGroupingOffsetAndAggOffsetMapping(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings)._1
    -
    -    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
    -    val reduceFunction = new IncrementalAggregateReduceFunction(
    -      aggregates,
    -      groupingOffsetMapping,
    -      intermediateRowArity)
    -    reduceFunction
    -  }
    -
    -  /**
    -    * Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
    -    */
    -  private[flink] def createAllWindowAggregationFunction(
    +  private[flink] def createAggregationAllWindowFunction(
           window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : AllWindowFunction[Row, Row, DataStreamWindow] = {
    -
    -    val aggFunction =
    -      createAggregateGroupReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings,
    -        inGroupingSet = false)
    +      finalRowArity: Int,
    +      properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
     
         if (isTimeWindow(window)) {
           val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
    -      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
    +      new IncrementalAggregateAllTimeWindowFunction(
    +        startPos,
    +        endPos,
    +        finalRowArity)
    +        .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
         } else {
    -      new AggregateAllWindowFunction(aggFunction)
    +      new IncrementalAggregateAllWindowFunction(
    +        finalRowArity)
         }
       }
     
       /**
    -    * Create a [[WindowFunction]] to compute partitioned group window aggregates.
    -    *
    +    * Create a [[WindowFunction]] for group window aggregates.
         */
    -  private[flink] def createWindowAggregationFunction(
    +  private[flink] def createAggregationGroupWindowFunction(
           window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    -      inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
    -
    -    val aggFunction =
    -      createAggregateGroupReduceFunction(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings,
    -        inGroupingSet = false)
    +      finalRowArity: Int,
    +      properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
     
         if (isTimeWindow(window)) {
           val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
    -      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
    +      new IncrementalAggregateTimeWindowFunction(
    +        startPos,
    +        endPos,
    +        finalRowArity)
    +        .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
         } else {
    -      new AggregateWindowFunction(aggFunction)
    +      new IncrementalAggregateWindowFunction(
    +        finalRowArity)
         }
       }
     
    -  /**
    -    * Create an [[AllWindowFunction]] to finalize incrementally pre-computed non-partitioned
    -    * window aggregates.
    -    */
    -  private[flink] def createAllWindowIncrementalAggregationFunction(
    -      window: LogicalWindow,
    +  private[flink] def createDataStreamAggregateFunction(
           namedAggregates: Seq[CalcitePair[AggregateCall, String]],
           inputType: RelDataType,
           outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : AllWindowFunction[Row, Row, DataStreamWindow] = {
    +      groupKeysIndex: Array[Int]): (ApiAggregateFunction[Row, Row, Row], RowTypeInfo) = {
     
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    +    val (aggFields, aggregates) =
    +      transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupKeysIndex.length)
     
    -    val (groupingOffsetMapping, aggOffsetMapping) =
    -      getGroupingOffsetAndAggOffsetMapping(
    -      namedAggregates,
    -      inputType,
    -      outputType,
    -      groupings)
    +    val groupKeysMapping = getGroupKeysMapping(inputType, outputType, groupKeysIndex)
     
    -    val finalRowArity = outputType.getFieldCount
    +    val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
     
    -    if (isTimeWindow(window)) {
    -      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new IncrementalAggregateAllTimeWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity,
    -        startPos,
    -        endPos)
    -      .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
    -    } else {
    -      new IncrementalAggregateAllWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity)
    +    if (groupKeysMapping.length != groupKeysIndex.length ||
    +      aggregateMapping.length != namedAggregates.length) {
    +      throw new TableException(
    +        "Could not find output field in input data type or aggregate functions.")
         }
    +
    +    val accumulatorRowType = createAccumulatorRowType(inputType, groupKeysIndex, aggregates)
    +    val aggFunction = new AggregateAggFunction(
    +      aggregates,
    +      aggFields,
    +      aggregateMapping,
    +      groupKeysIndex,
    +      groupKeysMapping,
    +      outputType.getFieldCount)
    +
    +    (aggFunction, accumulatorRowType)
       }
     
       /**
    -    * Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
    +    * Return true if all aggregates can be partially merged. False otherwise.
         */
    -  private[flink] def createWindowIncrementalAggregationFunction(
    -      window: LogicalWindow,
    -      namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +  private[flink] def doAllSupportPartialMerge(
    +      aggregateCalls: Seq[AggregateCall],
           inputType: RelDataType,
    -      outputType: RelDataType,
    -      groupings: Array[Int],
    -      properties: Seq[NamedWindowProperty])
    -    : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
    +      groupKeysCount: Int): Boolean = {
     
    -    val aggregates = transformToAggregateFunctions(
    -      namedAggregates.map(_.getKey),inputType,groupings.length)._2
    -
    -    val (groupingOffsetMapping, aggOffsetMapping) =
    -      getGroupingOffsetAndAggOffsetMapping(
    -        namedAggregates,
    -        inputType,
    -        outputType,
    -        groupings)
    -
    -    val finalRowArity = outputType.getFieldCount
    +    val aggregateList = transformToAggregateFunctions(
    +      aggregateCalls,
    +      inputType,
    +      groupKeysCount)._2
     
    -    if (isTimeWindow(window)) {
    -      val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
    -      new IncrementalAggregateTimeWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity,
    -        startPos,
    -        endPos)
    -      .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
    -    } else {
    -      new IncrementalAggregateWindowFunction(
    -        aggregates,
    -        groupingOffsetMapping,
    -        aggOffsetMapping,
    -        finalRowArity)
    -    }
    +    doAllSupportPartialMerge(aggregateList)
       }
     
       /**
    -    * Return true if all aggregates can be partially computed. False otherwise.
    +    * Return true if all aggregates can be partially merged. False otherwise.
         */
    -  private[flink] def doAllSupportPartialAggregation(
    -    aggregateCalls: Seq[AggregateCall],
    -    inputType: RelDataType,
    -    groupKeysCount: Int): Boolean = {
    -    transformToAggregateFunctions(
    -      aggregateCalls,
    -      inputType,
    -      groupKeysCount)._2.forall(_.supportPartial)
    +  private[flink] def doAllSupportPartialMerge(
    +      aggregateList: Array[TableAggregateFunction[_ <: Any]]): Boolean = {
    +    var ret: Boolean = true
    +    var i: Int = 0
    +    while (i < aggregateList.length && ret) {
    +      ret = ifMethodExitInFunction("merge", aggregateList(i))
    --- End diff --
    
    it will break the loop once ret = false, so there should not be any override. This should be exactly same as forall (I was intend to ensure the performance without traveling the entire list, but it seems forall behaviors in the same way)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103435442
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    --- End diff --
    
    Rename `ApiAggFunction` to `DataStreamAggFunc`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103728434
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceCombineFunction.scala ---
    @@ -69,11 +69,25 @@ class AggregateReduceCombineFunction(
           new JArrayList[Accumulator]()
         }
     
    +    var count:Int = 0
    --- End diff --
    
    space after `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103834968
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala ---
    @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
           new JArrayList[Accumulator]()
         }
     
    +    var count:Int = 0
         while (iterator.hasNext) {
           val record = iterator.next()
    +      count += 1
    --- End diff --
    
    Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103806213
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -737,101 +632,121 @@ object AggregateUtil {
               aggregates(index) = if (sqlMinMaxFunction.getKind == SqlKind.MIN) {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMinAggregate
    +                new ByteMinAggFunction
                   case SMALLINT =>
    -                new ShortMinAggregate
    +                new ShortMinAggFunction
                   case INTEGER =>
    -                new IntMinAggregate
    +                new IntMinAggFunction
                   case BIGINT =>
    -                new LongMinAggregate
    +                new LongMinAggFunction
                   case FLOAT =>
    -                new FloatMinAggregate
    +                new FloatMinAggFunction
                   case DOUBLE =>
    -                new DoubleMinAggregate
    +                new DoubleMinAggFunction
                   case DECIMAL =>
    -                new DecimalMinAggregate
    +                new DecimalMinAggFunction
                   case BOOLEAN =>
    -                new BooleanMinAggregate
    +                new BooleanMinAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Min aggregate does no support type:" + sqlType)
                 }
               } else {
                 sqlTypeName match {
                   case TINYINT =>
    -                new ByteMaxAggregate
    +                new ByteMaxAggFunction
                   case SMALLINT =>
    -                new ShortMaxAggregate
    +                new ShortMaxAggFunction
                   case INTEGER =>
    -                new IntMaxAggregate
    +                new IntMaxAggFunction
                   case BIGINT =>
    -                new LongMaxAggregate
    +                new LongMaxAggFunction
                   case FLOAT =>
    -                new FloatMaxAggregate
    +                new FloatMaxAggFunction
                   case DOUBLE =>
    -                new DoubleMaxAggregate
    +                new DoubleMaxAggFunction
                   case DECIMAL =>
    -                new DecimalMaxAggregate
    +                new DecimalMaxAggFunction
                   case BOOLEAN =>
    -                new BooleanMaxAggregate
    +                new BooleanMaxAggFunction
                   case sqlType: SqlTypeName =>
                     throw new TableException("Max aggregate does no support type:" + sqlType)
                 }
               }
             }
             case _: SqlCountAggFunction =>
    -          aggregates(index) = new CountAggregate
    +          aggregates(index) = new CountAggFunction
             case unSupported: SqlAggFunction =>
               throw new TableException("unsupported Function: " + unSupported.getName)
           }
    -      setAggregateDataOffset(index)
    -    }
    -
    -    // set the aggregate intermediate data start index in Row, and update current value.
    -    def setAggregateDataOffset(index: Int): Unit = {
    -      aggregates(index).setAggOffsetInRow(aggOffset)
    -      aggOffset += aggregates(index).intermediateDataType.length
         }
     
         (aggFieldIndexes, aggregates)
       }
     
    -  private def createAggregateBufferDataType(
    -    groupings: Array[Int],
    -    aggregates: Array[Aggregate[_]],
    -    inputType: RelDataType,
    -    windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
    +  private def createDataSetAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[TableAggregateFunction[_]],
    +      inputType: RelDataType,
    +      windowKeyTypes: Option[Array[TypeInformation[_]]] = None): RowTypeInfo = {
     
         // get the field data types of group keys.
    -    val groupingTypes: Seq[TypeInformation[_]] = groupings
    -      .map(inputType.getFieldList.get(_).getType)
    -      .map(FlinkTypeFactory.toTypeInfo)
    +    val groupingTypes: Seq[TypeInformation[_]] =
    +      groupings
    +        .map(inputType.getFieldList.get(_).getType)
    +        .map(FlinkTypeFactory.toTypeInfo)
     
         // get all field data types of all intermediate aggregates
    -    val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)
    +    val aggTypes: Seq[TypeInformation[_]] = aggregates.map { agg =>
    +      val clazz: Class[_] = agg.getClass
    --- End diff --
    
    We need to obtain the `TypeInformation` of the `Accumulator` here, not the type of the `AggregateFunction`.
    We might need to add a `getAccumulatorType()` method to the `AggregateFunction` if we cannot extract the type from the object returned by `AggregateFunction.createAccumulator()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3423: [FLINK-5768] [table] Apply new aggregation functions for ...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3423
  
    @fhueske I pushed a commit. Please take a look. Please note that this PR will close both FLINK5768 and FLINK5769. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103732317
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala ---
    @@ -100,8 +99,10 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
           new JArrayList[Accumulator]()
         }
     
    +    var count:Int = 0
    --- End diff --
    
    space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3423: [FLINK-5768] [table] Apply new aggregation functions for ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3423
  
    thanks for the update @shaoxuan-wang. The PR looks good to merge. 
    I do some final tests and run another build.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103474983
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala ---
    @@ -104,21 +110,27 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
             // calculate the current window and open a new window
             if (null != windowEnd) {
               // evaluate and emit the current window's result.
    -          doEvaluateAndCollect(out, windowStart, windowEnd)
    +          doEvaluateAndCollect(out, accumulatorList, windowStart, windowEnd)
    +
    +          // clear the accumulator list for all aggregate
    +          for (i <- aggregates.indices) {
    +            accumulatorList(i).clear()
    +          }
             } else {
               // set group keys value to final output.
               groupKeysMapping.foreach {
                 case (after, previous) =>
                   output.setField(after, record.getField(previous))
               }
             }
    -        // initiate intermediate aggregate value.
    -        aggregates.foreach(_.initiate(aggregateBuffer))
    +
             windowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
           }
     
    -      // merge intermediate aggregate value to the buffered value.
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +      // collect the accumulators for each aggregate
    +      for (i <- aggregates.indices) {
    +        accumulatorList(i).add(record.getField(accumStartPos + i).asInstanceOf[Accumulator])
    --- End diff --
    
    Pairwise merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103435531
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    --- End diff --
    
    `aggFieldsIndex` -> `aggInFields`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103467528
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -54,44 +55,51 @@ class AggregateReduceGroupFunction(
       override def open(config: Configuration) {
         Preconditions.checkNotNull(aggregates)
         Preconditions.checkNotNull(groupKeysMapping)
    -    aggregateBuffer = new Row(intermediateRowArity)
    +    aggregateBuffer = new Row(aggregates.length + groupKeysMapping.length)
         output = new Row(finalRowArity)
         if (!groupingSetsMapping.isEmpty) {
           intermediateGroupKeys = Some(groupKeysMapping.map(_._1))
         }
       }
     
       /**
    -   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    -   * calculate aggregated values output by aggregate buffer, and set them into output 
    -   * Row based on the mapping relation between intermediate aggregate data and output data.
    -   *
    -   * @param records  Grouped intermediate aggregate Rows iterator.
    -   * @param out The collector to hand results to.
    -   *
    -   */
    +    * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +    * calculate aggregated values output by aggregate buffer, and set them into output
    +    * Row based on the mapping relation between intermediate aggregate data and output data.
    +    *
    +    * @param records Grouped intermediate aggregate Rows iterator.
    +    * @param out     The collector to hand results to.
    +    *
    +    */
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
    -    // Initiate intermediate aggregate value.
    -    aggregates.foreach(_.initiate(aggregateBuffer))
    -
    -    // Merge intermediate aggregate value to buffer.
    +    // merge intermediate aggregate value to buffer.
         var last: Row = null
    -    records.foreach((record) => {
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +    val iterator = records.iterator()
    +    val accumulatorList = Array.fill(aggregates.length) {
    +      new JArrayList[Accumulator]()
    +    }
    +
    +    while (iterator.hasNext) {
    +      val record = iterator.next()
    +      for (i <- aggregates.indices) {
    +        accumulatorList(i).add(
    --- End diff --
    
    This materializes the whole group in Lists and will fail for large groups (or in case of a non-grouped aggregate).
    We need to change this to pairwise merges. The `List` should be reused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103434488
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    --- End diff --
    
    "Aggregate Function" -> "[[AggregateFunction]]"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3423: [FLINK-5768] [table] Apply new aggregation functions for ...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3423
  
    @fhueske  Thanks for the detailed review. I have addressed all your comments. Please take a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103730884
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala ---
    @@ -76,8 +75,10 @@ class DataSetSessionWindowAggregateCombineGroupFunction(
           new JArrayList[Accumulator]()
         }
     
    +    var count:Int = 0
    --- End diff --
    
    space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103478412
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala ---
    @@ -56,26 +46,16 @@ class IncrementalAggregateWindowFunction[W <: Window](
         * Row based on the mapping relation between intermediate aggregate data and output data.
         */
       override def apply(
    -    key: Tuple,
    -    window: W,
    -    records: Iterable[Row],
    -    out: Collector[Row]): Unit = {
    +      key: Tuple,
    --- End diff --
    
    Get group keys from `Tuple`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103445700
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val groupKeysIndex: Array[Int],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val finalRowArity: Int)
    +  extends ApiAggFunction[Row, Row, Row] {
    +
    +  override def createAccumulator(): Row = {
    +    val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
    +
    +    for (i <- aggregates.indices) {
    +      accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator())
    +    }
    +    accumulatorRow
    +  }
    +
    +  override def add(value: Row, accumulatorRow: Row) = {
    +    for (i <- groupKeysIndex.indices) {
    +      accumulatorRow.setField(i, value.getField(groupKeysIndex(i)))
    +    }
    +
    +    for (i <- aggregates.indices) {
    +      val accumulator =
    +        accumulatorRow.getField(i + groupKeysIndex.length).asInstanceOf[Accumulator]
    +      val v = value.getField(aggFieldsIndex(i))
    +      aggregates(i).accumulate(accumulator, v)
    +    }
    +  }
    +
    +  override def getResult(accumulatorRow: Row): Row = {
    +    val output = new Row(finalRowArity)
    --- End diff --
    
    `val output = new Row(aggregates.length)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103729741
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -95,11 +110,14 @@ class AggregateReduceGroupFunction(
             output.setField(after, last.getField(previous))
         }
     
    -    // get the final aggregate value and set it to output.
    +    // get final aggregate value and set to output.
         aggregateMapping.foreach {
    -      case (after, previous) =>
    +      case (after, previous) => {
             val agg = aggregates(previous)
    -        output.setField(after, agg.getValue(agg.merge(accumulatorList(previous))))
    +        val accumulator = agg.merge(accumulatorList(previous))
    +        val result = aggregates(previous).getValue(accumulator)
    --- End diff --
    
    `aggregates(previous)` -> `agg`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103437140
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.util.{ArrayList => JArrayList, List => JList}
    +import org.apache.flink.api.common.functions.{AggregateFunction => ApiAggFunction}
    +import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.types.Row
    +
    +/**
    +  * Aggregate Function used for the aggregate operator in
    +  * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
    +  *
    +  * @param aggregates       the list of all [[org.apache.flink.table.functions.AggregateFunction]]
    +  *                         used for this aggregation
    +  * @param aggFieldsIndex   the position (in the input Row) of the input value for each aggregate
    +  * @param aggregateMapping the list of the mapping of (the position of this aggregate result in the
    +  *                         output row => the index of the aggregate) for all the aggregates
    +  * @param groupKeysIndex   the position (in the input Row) of grouping keys
    +  * @param groupKeysMapping the list of mapping of (the position of the grouping key in the
    +  *                         output row => the index of grouping key) for all the grouping keys
    +  * @param finalRowArity    the arity of the final row
    +  */
    +class AggregateAggFunction(
    +    private val aggregates: Array[AggregateFunction[_]],
    +    private val aggFieldsIndex: Array[Int],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val groupKeysIndex: Array[Int],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val finalRowArity: Int)
    +  extends ApiAggFunction[Row, Row, Row] {
    +
    +  override def createAccumulator(): Row = {
    +    val accumulatorRow: Row = new Row(groupKeysIndex.length + aggregates.length)
    +
    +    for (i <- aggregates.indices) {
    +      accumulatorRow.setField(groupKeysIndex.length + i, aggregates(i).createAccumulator())
    +    }
    +    accumulatorRow
    +  }
    +
    +  override def add(value: Row, accumulatorRow: Row) = {
    +    for (i <- groupKeysIndex.indices) {
    --- End diff --
    
    No need to keep the grouping keys in the accumulator. They are available in `WindowFunction.apply()` in the key attribute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103474797
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala ---
    @@ -79,44 +85,60 @@ class DataSetSessionWindowAggregateCombineGroupFunction(
             // calculate the current window and open a new window.
             if (windowEnd != null) {
               // emit the current window's merged data
    -          doCollect(out, windowStart, windowEnd)
    +          doCollect(out, accumulatorList, windowStart, windowEnd)
    +
    +          // clear the accumulator list for all aggregate
    +          for (i <- aggregates.indices) {
    +            accumulatorList(i).clear()
    +          }
             } else {
               // set group keys to aggregateBuffer.
               for (i <- groupingKeys.indices) {
                 aggregateBuffer.setField(i, record.getField(i))
               }
             }
     
    -        // initiate intermediate aggregate value.
    -        aggregates.foreach(_.initiate(aggregateBuffer))
             windowStart = record.getField(rowTimeFieldPos).asInstanceOf[Long]
           }
     
    -      // merge intermediate aggregate value to the buffered value.
    -      aggregates.foreach(_.merge(record, aggregateBuffer))
    +      // collect the accumulators for each aggregate
    +      for (i <- aggregates.indices) {
    --- End diff --
    
    We cannot collect all accumulator and need to merge pairwise.
    I think it would be good to remove the preparation mapper and use `accumulate()` here but, this would result in even more significant code changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3423: [FLINK-5768] [table] Apply new aggregation functio...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3423#discussion_r103432399
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala ---
    @@ -23,7 +23,8 @@ import org.apache.calcite.rel.`type`.RelDataType
     import org.apache.calcite.rel.core.AggregateCall
     import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
     import org.apache.flink.api.java.tuple.Tuple
    -import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
    +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream,
    --- End diff --
    
    Do not break line here. Imports may exceed line limit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---