You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by shaoxuan-wang <gi...@git.apache.org> on 2017/04/18 13:53:07 UTC

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

GitHub user shaoxuan-wang opened a pull request:

    https://github.com/apache/flink/pull/3735

    [FLINK-6242] [table] Add code generation for DataSet Aggregates

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shaoxuan-wang/flink F6242-submit

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3735
    
----
commit 16e8aa7400f1b9a9f490522427f269fd01a0f640
Author: shaoxuan-wang <ws...@gmail.com>
Date:   2017-04-18T13:45:49Z

    [FLINK-6242] [table] Add code generation for DataSet Aggregates

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3735
  
    @fhueske  thanks for your feedback.
    Yes, we could keep GeneratedAggregations interface very clean as 
    ```
    abstract class GeneratedAggregations extends Function {
     def setAggregationResults(accumulators: Row, output: Row)
     def setForwardedFields(input: Row, output: Row)
     def accumulate(accumulators: Row, input: Row)
     def retract(accumulators: Row, input: Row)
     def createAccumulators(): Row
     def mergeAccumulatorsPair(a: Row, b: Row): Row
     def resetAccumulator(accumulators: Row)
    }
    ```
    But I feel it might be not very good to add more parameters into code generate function as caller function will usually have to construct unnecessary empty parameters. I think we can break code generate functions into 2-3 functions (these are just the interface to process code-gen parameters, the fundamental implementation of each function will be shared). Let me prototype the changes, and we can continue the discussions from there.
    
    Regarding to your other comments. I did not look into the logic of previous implementations while just focused on the code-gen. I will take a look and optimize them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111994200
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala ---
    @@ -64,38 +68,22 @@ class DataSetPreAggFunction(
       def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
    --- End diff --
    
    I think we can move the implementation of `preaggregate()` to `combine()` and let `mapPartition()` call `combine()`. `combine()` is called for groups for records with the key and `mapPartition()` just once (for the whole partition). This way we can remove some overhead from `combine()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111992686
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala ---
    @@ -21,44 +21,48 @@ import java.lang.Iterable
     
     import org.apache.flink.api.common.functions.RichGroupReduceFunction
     import org.apache.flink.configuration.Configuration
    -import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
     import org.apache.flink.types.Row
     import org.apache.flink.util.{Collector, Preconditions}
    +import org.slf4j.LoggerFactory
     
     /**
       * [[RichGroupReduceFunction]] to compute aggregates that do not support pre-aggregation for batch
       * (DataSet) queries.
       *
    -  * @param aggregates The aggregate functions.
    -  * @param aggInFields The positions of the aggregation input fields.
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
       * @param gkeyOutMapping The mapping of group keys between input and output positions.
    -  * @param aggOutMapping  The mapping of aggregates to output positions.
       * @param groupingSetsMapping The mapping of grouping set keys between input and output positions.
    -  * @param finalRowArity The arity of the final resulting row.
       */
     class DataSetAggFunction(
    -    private val aggregates: Array[AggregateFunction[_ <: Any]],
    -    private val aggInFields: Array[Array[Int]],
    -    private val aggOutMapping: Array[(Int, Int)],
    +    private val genAggregations: GeneratedAggregationsFunction,
         private val gkeyOutMapping: Array[(Int, Int)],
    --- End diff --
    
    It would be good if we could parameterize the method that generates the code such that we can do the grouping keys and grouping set copies with `GeneratedAggregations.setForwardFields()`. This should be possible as it is actually just setting constant boolean flags at certain positions in the output Row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112002707
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala ---
    @@ -18,111 +18,77 @@
     package org.apache.flink.table.runtime.aggregate
     
     import java.lang.Iterable
    -import java.util.{ArrayList => JArrayList}
     
     import org.apache.flink.api.common.functions.RichGroupReduceFunction
     import org.apache.flink.configuration.Configuration
    -import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
     import org.apache.flink.types.Row
    -import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
     
     /**
       * It wraps the aggregate logic inside of
       * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
       *
       * It is used for sliding on batch for both time and count-windows.
       *
    -  * @param aggregates aggregate functions.
    -  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
    -  *                         and output Row.
    -  * @param aggregateMapping index mapping between aggregate function list and aggregated value
    -  *                         index in output Row.
    -  * @param finalRowArity output row field count
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
    +  * @param keysAndAggregatesArity The total arity of keys and aggregates
       * @param finalRowWindowStartPos relative window-start position to last field of output row
       * @param finalRowWindowEndPos relative window-end position to last field of output row
       * @param windowSize size of the window, used to determine window-end for output row
       */
     class DataSetSlideWindowAggReduceGroupFunction(
    -    aggregates: Array[AggregateFunction[_ <: Any]],
    -    groupKeysMapping: Array[(Int, Int)],
    -    aggregateMapping: Array[(Int, Int)],
    -    finalRowArity: Int,
    +    genAggregations: GeneratedAggregationsFunction,
    +    keysAndAggregatesArity: Int,
         finalRowWindowStartPos: Option[Int],
         finalRowWindowEndPos: Option[Int],
         windowSize: Long)
    -  extends RichGroupReduceFunction[Row, Row] {
    -
    -  Preconditions.checkNotNull(aggregates)
    -  Preconditions.checkNotNull(groupKeysMapping)
    +  extends RichGroupReduceFunction[Row, Row]
    +    with Compiler[GeneratedAggregations] {
     
       private var collector: TimeWindowPropertyCollector = _
    +  protected val windowStartPos: Int = keysAndAggregatesArity
    +
       private var output: Row = _
    -  private val accumulatorStartPos: Int = groupKeysMapping.length
    -  protected val windowStartPos: Int = accumulatorStartPos + aggregates.length
    +  protected var accumulators: Row = _
     
    -  val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
    -    new JArrayList[Accumulator](2)
    -  }
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +  protected var function: GeneratedAggregations = _
     
       override def open(config: Configuration) {
    -    output = new Row(finalRowArity)
    +    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
    +                s"Code:\n$genAggregations.code")
    +    val clazz = compile(
    +      getClass.getClassLoader,
    +      genAggregations.name,
    +      genAggregations.code)
    +    LOG.debug("Instantiating AggregateHelper.")
    +    function = clazz.newInstance()
    +
    +    output = function.createOutputRow()
    +    accumulators = function.createAccumulators()
         collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos)
    -
    -    // init lists with two empty accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      val accumulator = aggregates(i).createAccumulator()
    -      accumulatorList(i).add(accumulator)
    -      accumulatorList(i).add(accumulator)
    -      i += 1
    -    }
       }
     
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
    -    // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      aggregates(i).resetAccumulator(accumulatorList(i).get(0))
    -      i += 1
    -    }
    +    // reset accumulator
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
         while (iterator.hasNext) {
           val record = iterator.next()
     
    -      // accumulate
    -      i = 0
    -      while (i < aggregates.length) {
    -        // insert received accumulator into acc list
    -        val newAcc = record.getField(accumulatorStartPos + i).asInstanceOf[Accumulator]
    -        accumulatorList(i).set(1, newAcc)
    -        // merge acc list
    -        val retAcc = aggregates(i).merge(accumulatorList(i))
    -        // insert result into acc list
    -        accumulatorList(i).set(0, retAcc)
    -        i += 1
    -      }
    +      function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
     
           // check if this record is the last record
           if (!iterator.hasNext) {
    --- End diff --
    
    move this behind the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3735


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111995303
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala ---
    @@ -19,88 +19,71 @@
     package org.apache.flink.table.runtime.aggregate
     
     import java.lang.Iterable
    -import java.util.{ArrayList => JArrayList}
     
     import org.apache.flink.api.common.functions.RichGroupReduceFunction
     import org.apache.flink.configuration.Configuration
    -import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
     import org.apache.flink.types.Row
     import org.apache.flink.util.{Collector, Preconditions}
    +import org.slf4j.LoggerFactory
     
     /**
       * [[RichGroupReduceFunction]] to compute the final result of a pre-aggregated aggregation
       * for batch (DataSet) queries.
       *
    -  * @param aggregates The aggregate functions.
    -  * @param aggOutFields The positions of the aggregation results in the output
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
       * @param gkeyOutFields The positions of the grouping keys in the output
       * @param groupingSetsMapping The mapping of grouping set keys between input and output positions.
    -  * @param finalRowArity The arity of the final resulting row
       */
     class DataSetFinalAggFunction(
    -    private val aggregates: Array[AggregateFunction[_ <: Any]],
    -    private val aggOutFields: Array[Int],
    +    private val genAggregations: GeneratedAggregationsFunction,
         private val gkeyOutFields: Array[Int],
    -    private val groupingSetsMapping: Array[(Int, Int)],
    -    private val finalRowArity: Int)
    -  extends RichGroupReduceFunction[Row, Row] {
    +    private val groupingSetsMapping: Array[(Int, Int)])
    +  extends RichGroupReduceFunction[Row, Row]
    +    with Compiler[GeneratedAggregations] {
     
    -  Preconditions.checkNotNull(aggregates)
    -  Preconditions.checkNotNull(aggOutFields)
       Preconditions.checkNotNull(gkeyOutFields)
       Preconditions.checkNotNull(groupingSetsMapping)
     
       private var output: Row = _
    +  private var accumulators: Row = _
    +
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +  private var function: GeneratedAggregations = _
     
       private val intermediateGKeys: Option[Array[Int]] = if (!groupingSetsMapping.isEmpty) {
         Some(gkeyOutFields)
       } else {
         None
       }
     
    -  private val numAggs = aggregates.length
    -  private val numGKeys = gkeyOutFields.length
    -
    -  private val accumulators: Array[JArrayList[Accumulator]] =
    -    Array.fill(numAggs)(new JArrayList[Accumulator](2))
    -
       override def open(config: Configuration) {
    -    output = new Row(finalRowArity)
    -
    -    // init lists with two empty accumulators
    -    for (i <- aggregates.indices) {
    -      val accumulator = aggregates(i).createAccumulator()
    -      accumulators(i).add(accumulator)
    -      accumulators(i).add(accumulator)
    -    }
    +    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
    +                s"Code:\n$genAggregations.code")
    +    val clazz = compile(
    +      getClass.getClassLoader,
    +      genAggregations.name,
    +      genAggregations.code)
    +    LOG.debug("Instantiating AggregateHelper.")
    +    function = clazz.newInstance()
    +
    +    output = function.createOutputRow()
    +    accumulators = function.createAccumulators()
       }
     
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
         val iterator = records.iterator()
     
         // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      aggregates(i).resetAccumulator(accumulators(i).get(0))
    -      i += 1
    -    }
    +    function.resetAccumulator(accumulators)
     
    +    var i = 0
         while (iterator.hasNext) {
           val record = iterator.next()
    --- End diff --
    
    we can make `record` a `var` and move its definition outside of the loop.
    Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112007816
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
       def setAggregationResults(accumulators: Row, output: Row)
     
       /**
    +    * Calculates the results from accumulators, and set the results to the output (with key offset)
    +    *
    +    * @param accumulators the accumulators (saved in a row) which contains the current
    +    *                     aggregated results
    +    * @param output       output results collected in a row
    +    */
    +  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
    --- End diff --
    
    Actually, I'm not sure if we really need to implement a different code generation function. I had a look at the code generation code and think that we could just add a few more parameters to the current code gen method. Right now, the behavior of most generated methods can be exactly defined:
    
    - `createAccumulators()`: generates a `Row` with the accumulators for each provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect a Row of accumulators with exactly this layout as one of their input parameters. In the following, this parameter is called `accs`.
    - `accumulate(accs, row)`: The `aggFields` parameter controls which fields of `row` are accumulated into which accumulator. We should rename this parameter to `accFields` though, IMO.
    - `retract(accs, row)`: same as for `accumulate`. We should add a separate parameter `retractFields: Array[Int]` though.
    - `setForwardedFields(input, output)`: The `fwdMapping` parameter controls which field of the input row is copied to which position of the output row. We could add an optional parameter to copy the `groupSetMapping` to the output as well.
    - `setAggregationResults(accs, output)`: The `aggMapping` parameter controls to which output fields the aggregation results are copied. If we add another parameter `partialResults: Boolean`, we can control whether to copy final results (`AggregateFunction.getValue()`) or partial results (the accumulator).
    - `createOutputRow()`: the `outputArity` parameter specfies the arity of the output row.
    - `mergeAccumulatorsPair(accs, other)`: **This is the only inflexible method**. We could change the behavior of the method as follows: The method expects as first parameter (`accs`) a Row with the same layout as generated by `createAccumulators`. The second parameter can be any row with accumulators at arbitrary positions. To enable the merging, we add a parameter `mergeMapping: Array[Int]` to the code generating function which defines which fields of the `other` parameter are merged with the fields in the `accs` Row. The method returns a Row with the default layout (as generated by `createAccumulators()`).
    - `resetAccumulator(accs)`: resets a Row of accumulators of the known layout.
    
    I haven't checked this thoroughly, but I think with these parameters, we can control the generated code sufficiently to support all aggregation operators for DataSet and DataStream, i.e., we can generate the currently existing functions such that they behave as the more specialized ones that you added. Since all code gen parameters (`accFields`, `retractFields`, `fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, `mergeMapping`) can be independently set for each type of operator, this should give us the flexibility for all types for operators. We only need to parameterize the code generation method appropriately. 
    
    In addition, we could make all parameters `Option` and generate empty methods if the parameters for a function are not set. (This could also be a follow up issue, IMO)
    
    What do you think @shaoxuan-wang ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3735
  
    Thanks @shaoxuan-wang!
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111990633
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -263,33 +263,56 @@ class CodeGenerator(
          aggFields: Array[Array[Int]],
          aggMapping: Array[Int],
          fwdMapping: Array[(Int, Int)],
    -     outputArity: Int)
    +     outputArity: Int,
    +     groupingKeys: Array[Int])
       : GeneratedAggregationsFunction = {
     
         def genSetAggregationResults(
           accTypes: Array[String],
           aggs: Array[String],
           aggMapping: Array[Int]): String = {
     
    -      val sig: String =
    +      val sigHelper: String =
             j"""
    -            |  public void setAggregationResults(
    -            |    org.apache.flink.types.Row accs,
    -            |    org.apache.flink.types.Row output)""".stripMargin
    +           |  private final void setAggregationResultsHelper(
    +           |    org.apache.flink.types.Row accs,
    +           |    org.apache.flink.types.Row output,
    +           |    java.lang.Integer offset)""".stripMargin
     
    -      val setAggs: String = {
    +      val setAggsHelper: String = {
             for (i <- aggs.indices) yield
               j"""
                  |    org.apache.flink.table.functions.AggregateFunction baseClass$i =
                  |      (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
                  |
                  |    output.setField(
    -             |      ${aggMapping(i)},
    +             |      ${aggMapping(i)} + offset,
    --- End diff --
    
    `${aggMapping(i)} + offset` -> `${aggMapping(i) + offset}` to add the constant `offset` to the mapping before generating the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111991399
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -407,32 +513,56 @@ class CodeGenerator(
             accTypes: Array[String],
             aggs: Array[String]): String = {
     
    -      val sig: String =
    +      val sigHelper: String =
             j"""
    -           |  public org.apache.flink.types.Row mergeAccumulatorsPair(
    +           |  public final org.apache.flink.types.Row mergeAccumulatorsPairHelper(
                |    org.apache.flink.types.Row a,
    -           |    org.apache.flink.types.Row b)
    +           |    org.apache.flink.types.Row b,
    +           |    java.lang.Integer offset)
                """.stripMargin
    -      val merge: String = {
    +      val mergeHelper: String = {
             for (i <- aggs.indices) yield
               j"""
                  |    ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
    -             |    ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i);
    +             |    ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i + offset);
    --- End diff --
    
    `b.getField($i + offset)` -> `b.getField(${i + offset})`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112002140
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala ---
    @@ -30,78 +30,46 @@ import org.apache.flink.types.Row
       *
       * It is used for sliding on batch for both time and count-windows.
       *
    -  * @param aggregates aggregate functions.
    -  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
    -  *                         and output Row.
    -  * @param aggregateMapping index mapping between aggregate function list and aggregated value
    -  *                         index in output Row.
    -  * @param finalRowArity output row field count
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
    +  * @param keysAndAggregatesArity The total arity of keys and aggregates
       * @param finalRowWindowStartPos relative window-start position to last field of output row
       * @param finalRowWindowEndPos relative window-end position to last field of output row
       * @param windowSize size of the window, used to determine window-end for output row
       */
     class DataSetSlideWindowAggReduceCombineFunction(
    -    aggregates: Array[AggregateFunction[_ <: Any]],
    -    groupKeysMapping: Array[(Int, Int)],
    -    aggregateMapping: Array[(Int, Int)],
    -    finalRowArity: Int,
    +    genAggregations: GeneratedAggregationsFunction,
    +    keysAndAggregatesArity: Int,
         finalRowWindowStartPos: Option[Int],
         finalRowWindowEndPos: Option[Int],
         windowSize: Long)
       extends DataSetSlideWindowAggReduceGroupFunction(
    -    aggregates,
    -    groupKeysMapping,
    -    aggregateMapping,
    -    finalRowArity,
    +    genAggregations,
    +    keysAndAggregatesArity,
         finalRowWindowStartPos,
         finalRowWindowEndPos,
         windowSize)
       with CombineFunction[Row, Row] {
     
    -  private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1
    -  private val intermediateRow: Row = new Row(intermediateRowArity)
    +  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
     
       override def combine(records: Iterable[Row]): Row = {
     
    -    // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      aggregates(i).resetAccumulator(accumulatorList(i).get(0))
    -      i += 1
    -    }
    +    // reset accumulator
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
         while (iterator.hasNext) {
           val record = iterator.next()
    --- End diff --
    
    make `record` a `var` and declare it outside of the loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3735
  
    @fhueske , your changes look good to me, I left a few comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111978489
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
       def setAggregationResults(accumulators: Row, output: Row)
     
       /**
    +    * Calculates the results from accumulators, and set the results to the output (with key offset)
    +    *
    +    * @param accumulators the accumulators (saved in a row) which contains the current
    +    *                     aggregated results
    +    * @param output       output results collected in a row
    +    */
    +  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
    --- End diff --
    
    I don't think we need to extend the `GeneratedAggregations` interface (except for `resetAccumulators()`)
    I would rather implement another code generation function that implements the existing methods differently. This would mean to add another method to `CodeGenerator` that generates the `GeneratedAggregations` interface suitable for the DataSet aggregations.
    
    - `setAggregationResultsWithKeyOffset` -> `setAggregationResults`
    - `setKeyToOutput` -> `setForwardedFields`
    - `accumulateWithKeyOffset` -> `accumulate`
    - `createAccumulatorsAndSetToOutput` could be replaced by `createAccumulators` (called once to create a reusable accumulators), `resetAccumulators`, and `setAggregationResults` (if it sets the accumulators instead of calling `AggFunction.getValue()`, see below)
    - `copyAccumulatorsToBuffer` -> `setAggregationResults` (the accumulators are partial aggregation results). This would mean we have two behaviors, setting the final (`getValue()`) or the partial result (accumulator) for `setAggregateResults()`. A simple flag during code gen would go for either the final or the partial result.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3735
  
    Hi @shaoxuan-wang, thanks for the PR. The changes look good.
    I opened a PR against your PR branch and refactored the `CodeGenerator.generateAggregations() method a bit. Among other things, I added code-gen for grouping sets.
    
    Let me know what you think.
    
    Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111999493
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala ---
    @@ -110,12 +110,8 @@ class DataSetSessionWindowAggregatePreProcessor(
         var windowEnd: java.lang.Long = null
    --- End diff --
    
    Move implementation to `combine()` can forward the `mapPartition()` call to `combine()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111993502
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala ---
    @@ -64,38 +68,22 @@ class DataSetPreAggFunction(
       def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
     
         // create accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      accumulators(i) = aggregates(i).createAccumulator()
    -      i += 1
    -    }
    +    accumulators = function.createAccumulators()
    --- End diff --
    
    create accumulators once and use `function.resetAccumulators()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111983678
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala ---
    @@ -68,23 +72,16 @@ class DataSetAggFunction(
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
         // create accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      accumulators(i) = aggregates(i).createAccumulator()
    -      i += 1
    -    }
    +    accumulators = function.createAccumulators()
     
         val iterator = records.iterator()
     
         while (iterator.hasNext) {
           val record = iterator.next()
    +      var i = 0
     
           // accumulate
    -      i = 0
    -      while (i < aggregates.length) {
    -        aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)(0)))
    -        i += 1
    -      }
    +      function.accumulate(accumulators, record)
     
           // check if this record is the last record
           if (!iterator.hasNext) {
    --- End diff --
    
    Couldn't we use `function.setForwardFields()` to forward the grouping keys to the output?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112002278
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala ---
    @@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
       override def combine(records: Iterable[Row]): Row = {
     
         // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      aggregates(i).resetAccumulator(accumulatorList(i).get(0))
    -      i += 1
    -    }
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
    +
         while (iterator.hasNext) {
           val record = iterator.next()
     
    -      i = 0
    -      while (i < aggregates.length) {
    -        // insert received accumulator into acc list
    -        val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator]
    -        accumulatorList(i).set(1, newAcc)
    -        // merge acc list
    -        val retAcc = aggregates(i).merge(accumulatorList(i))
    -        // insert result into acc list
    -        accumulatorList(i).set(0, retAcc)
    -        i += 1
    -      }
    +      function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
     
           // check if this record is the last record
           if (!iterator.hasNext) {
    --- End diff --
    
    move this behind the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111986331
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
       def setAggregationResults(accumulators: Row, output: Row)
     
       /**
    +    * Calculates the results from accumulators, and set the results to the output (with key offset)
    +    *
    +    * @param accumulators the accumulators (saved in a row) which contains the current
    +    *                     aggregated results
    +    * @param output       output results collected in a row
    +    */
    +  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
    --- End diff --
    
    This sounds a very good idea. I actually have thought to merge *WithKeyOffset functions into the existing functions. It works for most functions, but `setAggregationResults` and `setAggregationResults` are a little tricky. For `accumulate` and `setAggregateResults`, they do not need keyOffset, but for `merge`, they need.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111986866
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -263,33 +263,56 @@ class CodeGenerator(
          aggFields: Array[Array[Int]],
          aggMapping: Array[Int],
          fwdMapping: Array[(Int, Int)],
    -     outputArity: Int)
    +     outputArity: Int,
    +     groupingKeys: Array[Int])
       : GeneratedAggregationsFunction = {
     
         def genSetAggregationResults(
           accTypes: Array[String],
           aggs: Array[String],
           aggMapping: Array[Int]): String = {
     
    -      val sig: String =
    +      val sigHelper: String =
             j"""
    -            |  public void setAggregationResults(
    -            |    org.apache.flink.types.Row accs,
    -            |    org.apache.flink.types.Row output)""".stripMargin
    +           |  private final void setAggregationResultsHelper(
    +           |    org.apache.flink.types.Row accs,
    +           |    org.apache.flink.types.Row output,
    +           |    java.lang.Integer offset)""".stripMargin
     
    -      val setAggs: String = {
    +      val setAggsHelper: String = {
             for (i <- aggs.indices) yield
               j"""
                  |    org.apache.flink.table.functions.AggregateFunction baseClass$i =
                  |      (org.apache.flink.table.functions.AggregateFunction) ${aggs(i)};
                  |
                  |    output.setField(
    -             |      ${aggMapping(i)},
    +             |      ${aggMapping(i)} + offset,
                  |      baseClass$i.getValue((${accTypes(i)}) accs.getField($i)));""".stripMargin
           }.mkString("\n")
     
    -      j"""$sig {
    -         |$setAggs
    +      val setAggregationResults: String =
    +        j"""
    +           |  public void setAggregationResults(
    +           |    org.apache.flink.types.Row accs,
    +           |    org.apache.flink.types.Row output) {
    +           |    setAggregationResultsHelper(accs, output, 0);
    --- End diff --
    
    Code generated methods should be as "flat" as possible. Calling other helper methods adds overhead compared to inlining the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112002408
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala ---
    @@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions}
       * it does no final aggregate evaluation. It also includes the logic of
       * [[DataSetSlideTimeWindowAggFlatMapFunction]].
       *
    -  * @param aggregates aggregate functions
    -  * @param groupingKeysLength number of grouping keys
    -  * @param timeFieldPos position of aligned time field
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
    +  * @param keysAndAggregatesArity The total arity of keys and aggregates
       * @param windowSize window size of the sliding window
       * @param windowSlide window slide of the sliding window
       * @param returnType return type of this function
       */
     class DataSetSlideTimeWindowAggReduceGroupFunction(
    -    private val aggregates: Array[AggregateFunction[_ <: Any]],
    -    private val groupingKeysLength: Int,
    -    private val timeFieldPos: Int,
    +    private val genAggregations: GeneratedAggregationsFunction,
    +    private val keysAndAggregatesArity: Int,
         private val windowSize: Long,
         private val windowSlide: Long,
         @transient private val returnType: TypeInformation[Row])
       extends RichGroupReduceFunction[Row, Row]
       with CombineFunction[Row, Row]
    -  with ResultTypeQueryable[Row] {
    +  with ResultTypeQueryable[Row]
    +  with Compiler[GeneratedAggregations] {
     
    -  Preconditions.checkNotNull(aggregates)
    +  private val timeFieldPos = returnType.getArity - 1
    +  private val intermediateWindowStartPos = keysAndAggregatesArity
     
       protected var intermediateRow: Row = _
    -  // add one field to store window start
    -  protected val intermediateRowArity: Int = groupingKeysLength + aggregates.length + 1
    -  protected val accumulatorList: Array[JArrayList[Accumulator]] = Array.fill(aggregates.length) {
    -    new JArrayList[Accumulator](2)
    -  }
    -  private val intermediateWindowStartPos: Int = intermediateRowArity - 1
    +  private var accumulators: Row = _
    +
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +  private var function: GeneratedAggregations = _
     
       override def open(config: Configuration) {
    -    intermediateRow = new Row(intermediateRowArity)
    -
    -    // init lists with two empty accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      val accumulator = aggregates(i).createAccumulator()
    -      accumulatorList(i).add(accumulator)
    -      accumulatorList(i).add(accumulator)
    -      i += 1
    -    }
    +    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
    +                s"Code:\n$genAggregations.code")
    +    val clazz = compile(
    +      getClass.getClassLoader,
    +      genAggregations.name,
    +      genAggregations.code)
    +    LOG.debug("Instantiating AggregateHelper.")
    +    function = clazz.newInstance()
    +
    +    accumulators = function.createAccumulators()
    +    intermediateRow = function.createOutputRow()
       }
     
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
         // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      val accumulator = aggregates(i).createAccumulator()
    -      accumulatorList(i).set(0, accumulator)
    -      i += 1
    -    }
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
     
         while (iterator.hasNext) {
           val record = iterator.next()
     
           // accumulate
    -      i = 0
    -      while (i < aggregates.length) {
    -        // insert received accumulator into acc list
    -        val newAcc = record.getField(groupingKeysLength + i).asInstanceOf[Accumulator]
    -        accumulatorList(i).set(1, newAcc)
    -        // merge acc list
    -        val retAcc = aggregates(i).merge(accumulatorList(i))
    -        // insert result into acc list
    -        accumulatorList(i).set(0, retAcc)
    -        i += 1
    -      }
    +      function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
     
           // trigger tumbling evaluation
           if (!iterator.hasNext) {
    --- End diff --
    
    move this behind the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111982297
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---
    @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
       def setAggregationResults(accumulators: Row, output: Row)
     
       /**
    +    * Calculates the results from accumulators, and set the results to the output (with key offset)
    +    *
    +    * @param accumulators the accumulators (saved in a row) which contains the current
    +    *                     aggregated results
    +    * @param output       output results collected in a row
    +    */
    +  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
    --- End diff --
    
    We could reuse all your code, but just put it into a different method of the `CodeGenerator` and make it implement the existing methods. Their interfaces are the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111995370
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala ---
    @@ -68,23 +72,16 @@ class DataSetAggFunction(
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
         // create accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      accumulators(i) = aggregates(i).createAccumulator()
    -      i += 1
    -    }
    +    accumulators = function.createAccumulators()
     
         val iterator = records.iterator()
     
         while (iterator.hasNext) {
           val record = iterator.next()
    --- End diff --
    
    we can make `record` a `var` and move its definition outside of the loop.
    Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112001966
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala ---
    @@ -30,78 +30,46 @@ import org.apache.flink.types.Row
       *
       * It is used for sliding on batch for both time and count-windows.
       *
    -  * @param aggregates aggregate functions.
    -  * @param groupKeysMapping index mapping of group keys between intermediate aggregate Row
    -  *                         and output Row.
    -  * @param aggregateMapping index mapping between aggregate function list and aggregated value
    -  *                         index in output Row.
    -  * @param finalRowArity output row field count
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
    +  * @param keysAndAggregatesArity The total arity of keys and aggregates
       * @param finalRowWindowStartPos relative window-start position to last field of output row
       * @param finalRowWindowEndPos relative window-end position to last field of output row
       * @param windowSize size of the window, used to determine window-end for output row
       */
     class DataSetSlideWindowAggReduceCombineFunction(
    -    aggregates: Array[AggregateFunction[_ <: Any]],
    -    groupKeysMapping: Array[(Int, Int)],
    -    aggregateMapping: Array[(Int, Int)],
    -    finalRowArity: Int,
    +    genAggregations: GeneratedAggregationsFunction,
    +    keysAndAggregatesArity: Int,
         finalRowWindowStartPos: Option[Int],
         finalRowWindowEndPos: Option[Int],
         windowSize: Long)
       extends DataSetSlideWindowAggReduceGroupFunction(
    -    aggregates,
    -    groupKeysMapping,
    -    aggregateMapping,
    -    finalRowArity,
    +    genAggregations,
    +    keysAndAggregatesArity,
         finalRowWindowStartPos,
         finalRowWindowEndPos,
         windowSize)
       with CombineFunction[Row, Row] {
     
    -  private val intermediateRowArity: Int = groupKeysMapping.length + aggregateMapping.length + 1
    -  private val intermediateRow: Row = new Row(intermediateRowArity)
    +  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
     
       override def combine(records: Iterable[Row]): Row = {
     
    -    // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      aggregates(i).resetAccumulator(accumulatorList(i).get(0))
    -      i += 1
    -    }
    +    // reset accumulator
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
         while (iterator.hasNext) {
           val record = iterator.next()
     
    -      // accumulate
    -      i = 0
    -      while (i < aggregates.length) {
    -        // insert received accumulator into acc list
    -        val newAcc = record.getField(groupKeysMapping.length + i).asInstanceOf[Accumulator]
    -        accumulatorList(i).set(1, newAcc)
    -        // merge acc list
    -        val retAcc = aggregates(i).merge(accumulatorList(i))
    -        // insert result into acc list
    -        accumulatorList(i).set(0, retAcc)
    -        i += 1
    -      }
    +      function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
     
           // check if this record is the last record
           if (!iterator.hasNext) {
    --- End diff --
    
    move this behind the loop to save the check of the condition in the loop body.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111994938
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala ---
    @@ -64,38 +68,22 @@ class DataSetPreAggFunction(
       def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
     
         // create accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      accumulators(i) = aggregates(i).createAccumulator()
    -      i += 1
    -    }
    +    accumulators = function.createAccumulators()
     
         val iterator = records.iterator()
     
         while (iterator.hasNext) {
           val record = iterator.next()
    --- End diff --
    
    we can make `record` a `var` and move its definition outside of the loop.
    Then we can get rid of the `if (!iterator.hasNext)` check in the body of the while loop and set the `output` fields after the loop has terminated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111984555
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala ---
    @@ -97,12 +94,7 @@ class DataSetAggFunction(
             }
     
             // set agg results to output
    -        i = 0
    -        while (i < aggOutMapping.length) {
    -          val (out, in) = aggOutMapping(i)
    -          output.setField(out, aggregates(in).getValue(accumulators(in)))
    -          i += 1
    -        }
    +        function.setAggregationResults(accumulators, output)
     
             // set grouping set flags to output
             if (intermediateGKeys.isDefined) {
    --- End diff --
    
    I think this should eventually be integrated with `setForwardFields()` as well. 
    For now, we might leave it as it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111991648
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---
    @@ -451,13 +581,34 @@ class CodeGenerator(
           {
             for (i <- accTypes.indices) yield
               j"""
    -             |    accList$i = new java.util.ArrayList<${accTypes(i)}>(2);
    +             |    accList$i = new java.util.ArrayList<${accTypes(i)}>();
    --- End diff --
    
    Why not creating the `ArrayList` with initial capacity 2?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3735
  
    Hi @shaoxuan-wang, I'm fine with both approaches, single method with additional parameters or multiple methods. If you think the multiple methods approach is better, let's go for it.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r111983391
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala ---
    @@ -68,23 +72,16 @@ class DataSetAggFunction(
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
     
         // create accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      accumulators(i) = aggregates(i).createAccumulator()
    -      i += 1
    -    }
    +    accumulators = function.createAccumulators()
    --- End diff --
    
    We could create the accumulators once and use `function.resetAccumulators()` to reset and reuse the object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112003763
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala ---
    @@ -25,58 +25,56 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
     import org.apache.flink.api.java.typeutils.ResultTypeQueryable
     import org.apache.flink.configuration.Configuration
     import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    -import org.apache.flink.table.functions.AggregateFunction
    +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
     import org.apache.flink.types.Row
    -import org.apache.flink.util.Preconditions
    -
    +import org.slf4j.LoggerFactory
     
     /**
       * This map function only works for windows on batch tables.
       * It appends an (aligned) rowtime field to the end of the output row.
    +  *
    +  * @param genAggregations      Code-generated [[GeneratedAggregations]]
    +  * @param timeFieldPos         Time field position in input row
    +  * @param tumbleTimeWindowSize The size of tumble time window
       */
     class DataSetWindowAggMapFunction(
    -    private val aggregates: Array[AggregateFunction[_]],
    -    private val aggFields: Array[Array[Int]],
    -    private val groupingKeys: Array[Int],
    -    private val timeFieldPos: Int, // time field position in input row
    +    private val genAggregations: GeneratedAggregationsFunction,
    +    private val timeFieldPos: Int,
         private val tumbleTimeWindowSize: Option[Long],
         @transient private val returnType: TypeInformation[Row])
    -  extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
    -
    -  Preconditions.checkNotNull(aggregates)
    -  Preconditions.checkNotNull(aggFields)
    -  Preconditions.checkArgument(aggregates.length == aggFields.length)
    +  extends RichMapFunction[Row, Row]
    +    with ResultTypeQueryable[Row]
    +    with Compiler[GeneratedAggregations] {
     
       private var output: Row = _
    -  // add one more arity to store rowtime
    -  private val partialRowLength = groupingKeys.length + aggregates.length + 1
    -  // rowtime index in the buffer output row
    -  private val rowtimeIndex: Int = partialRowLength - 1
    +
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +  private var function: GeneratedAggregations = _
     
       override def open(config: Configuration) {
    -    output = new Row(partialRowLength)
    +    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
    +                s"Code:\n$genAggregations.code")
    +    val clazz = compile(
    +      getRuntimeContext.getUserCodeClassLoader,
    +      genAggregations.name,
    +      genAggregations.code)
    +    LOG.debug("Instantiating AggregateHelper.")
    +    function = clazz.newInstance()
    +
    +    output = function.createOutputRow()
       }
     
       override def map(input: Row): Row = {
     
    -    var i = 0
    -    while (i < aggregates.length) {
    -      val agg = aggregates(i)
    -      val fieldValue = input.getField(aggFields(i)(0))
    -      val accumulator = agg.createAccumulator()
    -      agg.accumulate(accumulator, fieldValue)
    -      output.setField(groupingKeys.length + i, accumulator)
    -      i += 1
    -    }
    +    function.createAccumulatorsAndSetToOutput(output)
    --- End diff --
    
    create an accumulator with `function.createAccumulator()` once in `open()`, reset it here, and copy it to `output` with `function.setAggregationResults()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---