You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ChengXiangLi <gi...@git.apache.org> on 2016/03/01 06:18:37 UTC

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

GitHub user ChengXiangLi opened a pull request:

    https://github.com/apache/flink/pull/1746

    [FLINK-3474] support partial aggregate

    This PR includes:
    * New aggregate interface which support partial aggregate.
    * Simple aggregate function implementation, such as SUM/AVG/COUNT/MIN/MAX.
    * DataSetAggregateRule which translate logical calcite aggregate node to Flink operator functions.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ChengXiangLi/flink partialAggregate

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1746.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1746
    
----
commit a146ff09990ffe612a02c6441a13f1062c6426a9
Author: chengxiang li <ch...@intel.com>
Date:   2016-03-01T02:47:35Z

    [FLINK-3474] support partial aggregate
    
    d

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54554936
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.plan.nodes.logical.FlinkAggregate
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +object AggregateUtil {
    +
    +  /**
    +   * Create Flink operator functions for aggregates. It includes 2 implementations of Flink 
    +   * operator functions:
    +   * [[org.apache.flink.api.common.functions.MapFunction]] and 
    +   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate,
    +   * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). 
    +   * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the 
    +   * intermediate aggregate values of all aggregate function, it's stored in Row by the following
    +   * format:
    +   *
    +   * {{{
    +   *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    +   *                             |                          |
    +   *                             v                          v
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *                                              ^
    +   *                                              |
    +   *                               sum(y) aggOffsetInRow = 4
    +   * }}}
    +   *
    +   */
    +  def createOperatorFunctionsForAggregates(aggregate: FlinkAggregate,
    +      inputType: RelDataType, outputType: RelDataType,
    +      groupings: Array[Int]): AggregateResult = {
    +
    +    val aggregateCalls: Seq[AggregateCall] = aggregate.getAggCallList
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +
    +    transformToAggregateFunctions(aggregateCalls, aggFieldIndexes,
    +      aggregates, inputType, groupings.length)
    +
    +    val mapFunction = new AggregateMapFunction(aggregates, aggFieldIndexes, groupings)
    +
    +    val bufferDataType: RelRecordType =
    +      createAggregateBufferDataType(groupings, aggregates, inputType)
    +
    +    // the mapping relation between field index of intermediate aggregate Row and output Row.
    +    var groupingOffsetMapping = ArrayBuffer[(Int, Int)]()
    +
    +    // the mapping relation between aggregate function index in list and its corresponding
    +    // field index in output Row.
    +    var aggOffsetMapping = ArrayBuffer[(Int, Int)]()
    +
    +
    +    outputType.getFieldList.zipWithIndex.foreach {
    --- End diff --
    
    I find this part not easy to understand, partially because it mixes the initialization of `groupingOffsetMapping` and `aggOffsetMapping`. How about we extend `getMatchedAggregateIndex` and `getMatchedFieldIndex` to completely handle the intialization of `aggOffsetMapping` and `groupingOffsetMapping`, respectively? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54548872
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)])
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private final val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate data and output data.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    +      aggregates.foreach(_.merge(next, buffer))
    +    }
    +
    +    val output: Row = new Row(finalRowLength)
    +
    +    groupKeysMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, buffer.productElement(previous))
    +    }
    +
    +    aggregateMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, aggregates(previous).evaluate(buffer))
    +    }
    +
    +    out.collect(output)
    +  }
    +}
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and 
    + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceCombineFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)])
    +    extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] {
    +
    +  private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate Row and output Row.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    +      aggregates.foreach(_.merge(next, buffer))
    +    }
    +
    +    val output: Row = new Row(finalRowLength)
    --- End diff --
    
    Reuse `output` object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55010716
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val intermediateRowArity: Int)
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +  private var aggregateBuffer: Row = _
    +  private var output: Row = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +    aggregateBuffer = new Row(intermediateRowArity)
    +    output = new Row(finalRowLength)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate data and output data.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +
    +    // Initiate intermediate aggregate value.
    +    aggregates.foreach(_.initiate(aggregateBuffer))
    +
    +    // Merge intermediate aggregate value to buffer.
    +    var last: Row = null
    +    records.foreach((record) => {
    +      aggregates.foreach(_.merge(record, aggregateBuffer))
    +      last = record
    +    })
    +
    +    // Set group keys value to final output.
    +    groupKeysMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, last.productElement(previous))
    +    }
    +
    +    // Evaluate final aggregate value and set to output.
    +    aggregateMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, aggregates(previous).evaluate(aggregateBuffer))
    +    }
    +
    +    out.collect(output)
    +  }
    +}
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and 
    + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceCombineFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val intermediateRowArity: Int)
    +    extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] {
    +
    +  private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    --- End diff --
    
    Can be computed in open and does not need to be a member val.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54838416
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---
    @@ -17,26 +17,77 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     /**
    - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate. To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
      */
     trait Aggregate[T] extends Serializable {
    +
    +  protected var aggOffsetInRow: Int = _
    +
       /**
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
        */
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
     
       /**
    -   * Feed the aggregate field value.
    +   * Transform the aggregate field value into intermediate aggregate data.
        * @param value
    +   * @param intermediate
        */
    -  def aggregate(value: Any)
    +  def prepare(value: Any, intermediate: Row): Unit
    --- End diff --
    
    So null value handling should be done inside aggregate function implementation instead of at aggregate interface level, there was an umbrella JIRA about null value handling in Table API(FLINK-3139), i would create a subtask which do the null value handling for COUNT/SUM/MIN/MAX/AVG which are introduced in this PR, is that ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-191343405
  
    Yes, you are right. 
    We need a `RichCombineToGroupCombineWrapper` and inject it as a temporary fix until we have dedicated drivers for `CombineFunction`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54564821
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---
    @@ -17,68 +17,96 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     abstract class MaxAggregate[T: Numeric] extends Aggregate[T] {
     
    -  var result: T = _
    -  val numericResult = implicitly[Numeric[T]]
    +  private val numeric = implicitly[Numeric[T]]
     
    -  override def aggregate(value: Any): Unit = {
    -    val input: T = value.asInstanceOf[T]
    +  /**
    +   * Initiate the partial aggregate value in Row.
    +   * @param intermediate
    +   */
    +  override def initiate(intermediate: Row): Unit = {
    +    intermediate.setField(aggOffsetInRow, numeric.zero)
    --- End diff --
    
    should initialize to the minimum value of the data type instead of `numeric.zero`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-193636206
  
    merging this PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54548849
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)])
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private final val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate data and output data.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    +      aggregates.foreach(_.merge(next, buffer))
    +    }
    +
    +    val output: Row = new Row(finalRowLength)
    +
    +    groupKeysMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, buffer.productElement(previous))
    +    }
    +
    +    aggregateMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, aggregates(previous).evaluate(buffer))
    +    }
    +
    +    out.collect(output)
    +  }
    +}
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and 
    + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceCombineFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)])
    +    extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] {
    +
    +  private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate Row and output Row.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    --- End diff --
    
    Same object reuse issue as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54565116
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---
    @@ -17,26 +17,77 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     /**
    - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate. To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
      */
     trait Aggregate[T] extends Serializable {
    +
    +  protected var aggOffsetInRow: Int = _
    +
       /**
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
        */
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
     
       /**
    -   * Feed the aggregate field value.
    +   * Transform the aggregate field value into intermediate aggregate data.
        * @param value
    +   * @param intermediate
        */
    -  def aggregate(value: Any)
    +  def prepare(value: Any, intermediate: Row): Unit
    --- End diff --
    
    The aggregation functions are not `null` safe. This can be done by checking in `prepare()` for `null` and using `initiate()` instead of `prepare()` in case of `null`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54706319
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---
    @@ -17,26 +17,77 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     /**
    - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate. To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
      */
     trait Aggregate[T] extends Serializable {
    +
    +  protected var aggOffsetInRow: Int = _
    +
       /**
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
        */
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
     
       /**
    -   * Feed the aggregate field value.
    +   * Transform the aggregate field value into intermediate aggregate data.
        * @param value
    +   * @param intermediate
        */
    -  def aggregate(value: Any)
    +  def prepare(value: Any, intermediate: Row): Unit
    --- End diff --
    
    Aggregate functions may handle `null` in different ways, some ignore it, some take it as specified value, i'm not sure whether it's a good idea to handle it with `initiate()` in all cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi closed the pull request at:

    https://github.com/apache/flink/pull/1746


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55010392
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +
    +class AggregateMapFunction[IN, OUT](
    +    private val aggregates: Array[Aggregate[_]],
    +    private val aggFields: Array[Int],
    +    private val groupingKeys: Array[Int],
    +    @transient private val returnType: TypeInformation[OUT])
    +    extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] {
    +  
    +  private var partialRowLength: Int = _
    --- End diff --
    
    no need to make this a member variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-193631851
  
    FLINK-3586 is created to track Long AVG issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55134609
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---
    @@ -17,115 +17,144 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    -import java.math.BigInteger
     import com.google.common.math.LongMath
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
     
    -// for byte, short, int we return int
    -abstract class IntegralAvgAggregate[T: Numeric] extends Aggregate[T] {
    +abstract class IntegralAvgAggregate[T] extends Aggregate[T] {
    +  private final val intermediateType = Array(SqlTypeName.BIGINT, SqlTypeName.BIGINT)
       
    -  var sum: Long = 0
    -  var count: Long = 0
    +  override def initiate(partial: Row): Unit = {
    +    partial.setField(aggOffsetInRow, 0L)
    +    partial.setField(aggOffsetInRow + 1, 0L)
    +  }
    +
    +  override def merge(partial: Row, buffer: Row): Unit = {
    +    val partialSum = partial.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val partialCount = partial.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    buffer.setField(aggOffsetInRow, LongMath.checkedAdd(partialSum, bufferSum))
    +    buffer.setField(aggOffsetInRow + 1, LongMath.checkedAdd(partialCount, bufferCount))
    +  }
     
    -  override def initiateAggregate: Unit = {
    -    sum = 0
    -    count = 0
    +  override def intermediateDataType: Array[SqlTypeName] = {
    +    intermediateType
       }
     
    +  override def supportPartial: Boolean = true
     }
     
     class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
    -
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Byte])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Byte]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Byte = {
    -    (sum / count).toByte
    +  override def evaluate(buffer: Row): Byte = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toByte
       }
     }
     
     class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Short])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Short]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Short = {
    -    (sum / count).toShort
    +  override def evaluate(buffer: Row): Short = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toShort
       }
     }
     
     class IntAvgAggregate extends IntegralAvgAggregate[Int] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Int])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Int]
    +    partial.setField(aggOffsetInRow, input.toLong)
    --- End diff --
    
    Previously, `BigInteger` is used inside aggregate function to store the sum value. For partial aggregate, the partial sum and count value need to be transferred between Flink operators, which is part of the data type, there is not corresponding type for `BigInteger` in Calcite's `SqlTypeName`, i have not found a suitable solution for this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54555220
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.plan.nodes.logical.FlinkAggregate
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +object AggregateUtil {
    +
    +  /**
    +   * Create Flink operator functions for aggregates. It includes 2 implementations of Flink 
    +   * operator functions:
    +   * [[org.apache.flink.api.common.functions.MapFunction]] and 
    +   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate,
    +   * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). 
    +   * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the 
    +   * intermediate aggregate values of all aggregate function, it's stored in Row by the following
    +   * format:
    +   *
    +   * {{{
    +   *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    +   *                             |                          |
    +   *                             v                          v
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *                                              ^
    +   *                                              |
    +   *                               sum(y) aggOffsetInRow = 4
    +   * }}}
    +   *
    +   */
    +  def createOperatorFunctionsForAggregates(aggregate: FlinkAggregate,
    +      inputType: RelDataType, outputType: RelDataType,
    +      groupings: Array[Int]): AggregateResult = {
    +
    +    val aggregateCalls: Seq[AggregateCall] = aggregate.getAggCallList
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +
    +    transformToAggregateFunctions(aggregateCalls, aggFieldIndexes,
    +      aggregates, inputType, groupings.length)
    +
    +    val mapFunction = new AggregateMapFunction(aggregates, aggFieldIndexes, groupings)
    +
    +    val bufferDataType: RelRecordType =
    +      createAggregateBufferDataType(groupings, aggregates, inputType)
    +
    +    // the mapping relation between field index of intermediate aggregate Row and output Row.
    +    var groupingOffsetMapping = ArrayBuffer[(Int, Int)]()
    +
    +    // the mapping relation between aggregate function index in list and its corresponding
    +    // field index in output Row.
    +    var aggOffsetMapping = ArrayBuffer[(Int, Int)]()
    +
    +
    +    outputType.getFieldList.zipWithIndex.foreach {
    +      case (fieldType: RelDataTypeField, outputIndex: Int) =>
    +
    +        val aggregateIndex: Int = getMatchedAggregateIndex(aggregate, fieldType)
    +        if (aggregateIndex != -1) {
    +          aggOffsetMapping += ((outputIndex, aggregateIndex))
    +        } else {
    +          val groupKeyIndex: Int = getMatchedFieldIndex(inputType, fieldType, groupings)
    +          if (groupKeyIndex != -1) {
    +            groupingOffsetMapping += ((outputIndex, groupKeyIndex))
    +          } else {
    +            throw new PlanGenException("Could not find output field in input data type " +
    +                "or aggregate function.")
    +          }
    +        }
    +    }
    +
    +    val allPartialAggregate = aggregates.map(_.supportPartial).foldLeft(true)(_ && _)
    --- End diff --
    
    you can do this with `reduce` to avoid the initial `true` element.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-193656571
  
    Thanks @ChengXiangLi for merging. The ASF bot only closes PRs which are merged into master. Can you close this PR manually? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54550270
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala ---
    @@ -44,17 +45,24 @@ class DataSetAggregateRule
         val inputType = agg.getInput.getRowType()
     
         // add grouping fields, position keys in the input, and input type
    -    val aggregateFunction = AggregateFactory.createAggregateInstance(agg.getAggCallList,
    -        inputType, grouping)
    +    val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(agg,
    --- End diff --
    
    `createOperatorFunctionsForAggregates` only uses `agg.getAggCallList()`. 
    Can you change `createOperatorFunctionsForAggregates` to expect `List<AggCall>` instead of `FlinkAggregate`?
      


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54544027
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala ---
    @@ -44,17 +45,24 @@ class DataSetAggregateRule
         val inputType = agg.getInput.getRowType()
     
         // add grouping fields, position keys in the input, and input type
    -    val aggregateFunction = AggregateFactory.createAggregateInstance(agg.getAggCallList,
    -        inputType, grouping)
    +    val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(agg,
    +        inputType, rel.getRowType, grouping)
    +
    +    val mapNode = new DataSetMap(rel.getCluster,
    +      traitSet,
    +      convInput,
    +      aggregateResult.intermediateDataType,
    +      agg.toString,
    +      aggregateResult.mapFunc)
     
         new DataSetGroupReduce(
           rel.getCluster,
           traitSet,
    -      convInput,
    +      mapNode,
           rel.getRowType,
           agg.toString,
    -      grouping,
    -      aggregateFunction)
    +      (0 until grouping.length).toArray,
    +      aggregateResult.reduceGroupFunc)
    --- End diff --
    
    Can we also update the `DataSetGroupReduce` to accept a function that generates the `GroupReduceFunction` instead of the `GroupReduceFunction` itself?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54547003
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichMapFunction
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +
    +class AggregateMapFunction(
    +    private val aggregates: Array[Aggregate[_]],
    +    private val aggFields: Array[Int],
    +    private val groupingKeys: Array[Int]) extends RichMapFunction[Row, Row] {
    +  
    +  private final val partialRowLength = groupingKeys.length +
    +      aggregates.map(_.intermediateDataType.length).sum
    +  
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(aggFields)
    +    Preconditions.checkArgument(aggregates.size == aggFields.size)
    +  }
    +
    +  override def map(value: Row): Row = {
    +    
    +    val output = new Row(partialRowLength)
    --- End diff --
    
    We can create a single `output` object in `open()` and reuse it in each `map()` call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54838441
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val intermediateRowArity: Int)
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +  private var aggregateBuffer: Row = _
    +  private var output: Row = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +    aggregateBuffer = new Row(intermediateRowArity)
    +    output = new Row(finalRowLength)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate data and output data.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +
    +    // Initiate intermediate aggregate value.
    +    aggregates.foreach(_.initiate(aggregateBuffer))
    +
    +    // Merge intermediate aggregate value to buffer.
    +    var last: Row = null
    +    records.foreach((record) =>  {
    +      aggregates.foreach(_.merge(record, aggregateBuffer))
    +      last = record
    +    })
    +
    +    // Set group keys to aggregateBuffer.
    +    for (i <- 0 until groupKeysMapping.length) {
    --- End diff --
    
    yes, i would update this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54761986
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---
    @@ -17,26 +17,77 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     /**
    - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate. To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
      */
     trait Aggregate[T] extends Serializable {
    +
    +  protected var aggOffsetInRow: Int = _
    +
       /**
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
        */
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
     
       /**
    -   * Feed the aggregate field value.
    +   * Transform the aggregate field value into intermediate aggregate data.
        * @param value
    +   * @param intermediate
        */
    -  def aggregate(value: Any)
    +  def prepare(value: Any, intermediate: Row): Unit
    --- End diff --
    
    Yes, this would be aggregation specific. 
    For example for a `SUM` aggregation, `prepare` could insert a `0`, which is basically the same what `initiate` would do. However, it is also OK, to do it in `prepare` directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55010678
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val intermediateRowArity: Int)
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    --- End diff --
    
    Can be computed in open and does not need to be a member val.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54763270
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,198 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)],
    +    private val intermediateRowArity: Int)
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +  private var aggregateBuffer: Row = _
    +  private var output: Row = _
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +    aggregateBuffer = new Row(intermediateRowArity)
    +    output = new Row(finalRowLength)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate data and output data.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +
    +    // Initiate intermediate aggregate value.
    +    aggregates.foreach(_.initiate(aggregateBuffer))
    +
    +    // Merge intermediate aggregate value to buffer.
    +    var last: Row = null
    +    records.foreach((record) =>  {
    +      aggregates.foreach(_.merge(record, aggregateBuffer))
    +      last = record
    +    })
    +
    +    // Set group keys to aggregateBuffer.
    +    for (i <- 0 until groupKeysMapping.length) {
    --- End diff --
    
    Is this necessary? Looks like we copy the keys first into the `aggregateBuffer` and then into the `output` row. Can't we copy the keys directly from `last` to `output`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-193194745
  
    Thanks Chengxiang for the update. I think we can merge the PR now.
    
    But we should open a JIRA and improve the LongAvg aggregation. The communication between Mapper and GroupReduce for aggregation is Flink internal and there is no need to expose this to Calcite. I am not sure if our abstraction is well chosen, i.e., that a single DataSetNode should only translate to a single DataSet operator. Maybe we should do another refactoring before merging back to master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54545477
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---
    @@ -17,26 +17,77 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     /**
    - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate. To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
      */
     trait Aggregate[T] extends Serializable {
    +
    +  protected var aggOffsetInRow: Int = _
    +
       /**
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
        */
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
     
       /**
    -   * Feed the aggregate field value.
    +   * Transform the aggregate field value into intermediate aggregate data.
        * @param value
    +   * @param intermediate
        */
    -  def aggregate(value: Any)
    +  def prepare(value: Any, intermediate: Row): Unit
     
       /**
    -   * Return final aggregated value.
    +   * Merge intermediate aggregate data into aggregate buffer.
    +   * @param intermediate
    +   * @param buffer
    +   */
    +  def merge(intermediate: Row, buffer: Row): Unit
    +
    +  /**
    +   * Calculate the final aggregated result based on aggregate buffer.
    +   * @param buffer
        * @return
        */
    -  def getAggregated(): T
    +  def evaluate(buffer: Row): T
    +
    +  /**
    +   * Intermediate aggregate value types.
    +   * @return
    +   */
    +  def intermediateDataType: Array[SqlTypeName]
    +
    +  /**
    +   * Whether aggregate function support partial aggregate.
    +   * @return
    +   */
    +  def supportPartial: Boolean = true
    --- End diff --
    
    Make the default `false` to be conservative?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54542545
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMapPartition.scala ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.functions.MapPartitionFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.plan.TypeConverter
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +import org.apache.flink.api.table.{Row, TableConfig}
    +import scala.collection.JavaConverters._
    +/**
    +  * Flink RelNode which matches along with MapPartitionOperator.
    +  *
    +  */
    +class DataSetMapPartition(
    --- End diff --
    
    I think this class is not used and can be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54857042
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---
    @@ -17,26 +17,77 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     /**
    - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate. To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
      */
     trait Aggregate[T] extends Serializable {
    +
    +  protected var aggOffsetInRow: Int = _
    +
       /**
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
        */
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
     
       /**
    -   * Feed the aggregate field value.
    +   * Transform the aggregate field value into intermediate aggregate data.
        * @param value
    +   * @param intermediate
        */
    -  def aggregate(value: Any)
    +  def prepare(value: Any, intermediate: Row): Unit
    --- End diff --
    
    Yes, I meant to do that at the level of a concrete aggregation function implementation not on the interface / abstract class level. Sorry for the confusion caused by placing this comment in the interface code.
    Most of the runtime code, including code gen, serializers, and comparators, does already support `null` values. IMO it would be good to add `null` support for the currently supported aggregation functions with this PR as well. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54548916
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)])
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private final val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate data and output data.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    +      aggregates.foreach(_.merge(next, buffer))
    +    }
    +
    +    val output: Row = new Row(finalRowLength)
    +
    +    groupKeysMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, buffer.productElement(previous))
    +    }
    +
    +    aggregateMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, aggregates(previous).evaluate(buffer))
    +    }
    +
    +    out.collect(output)
    +  }
    +}
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and 
    + * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceCombineFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)])
    +    extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] {
    +
    +  private val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate Row and output Row.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    +      aggregates.foreach(_.merge(next, buffer))
    +    }
    +
    +    val output: Row = new Row(finalRowLength)
    +
    +    groupKeysMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, buffer.productElement(previous))
    +    }
    +
    +    aggregateMapping.map {
    +      case (after, previous) =>
    +        output.setField(after, aggregates(previous).evaluate(buffer))
    +    }
    +
    +    out.collect(output)
    +  }
    +
    +  /**
    +   * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   *
    +   * @param records  Sub-grouped intermediate aggregate Rows iterator.
    +   * @return Combined intermediate aggregate Row.
    +   *
    +   */
    +  override def combine(records: Iterable[Row]): Row = {
    +
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    --- End diff --
    
    Object reuse issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55014256
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---
    @@ -17,115 +17,144 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    -import java.math.BigInteger
     import com.google.common.math.LongMath
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
     
    -// for byte, short, int we return int
    -abstract class IntegralAvgAggregate[T: Numeric] extends Aggregate[T] {
    +abstract class IntegralAvgAggregate[T] extends Aggregate[T] {
    +  private final val intermediateType = Array(SqlTypeName.BIGINT, SqlTypeName.BIGINT)
       
    -  var sum: Long = 0
    -  var count: Long = 0
    +  override def initiate(partial: Row): Unit = {
    +    partial.setField(aggOffsetInRow, 0L)
    +    partial.setField(aggOffsetInRow + 1, 0L)
    +  }
    +
    +  override def merge(partial: Row, buffer: Row): Unit = {
    +    val partialSum = partial.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val partialCount = partial.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    buffer.setField(aggOffsetInRow, LongMath.checkedAdd(partialSum, bufferSum))
    +    buffer.setField(aggOffsetInRow + 1, LongMath.checkedAdd(partialCount, bufferCount))
    +  }
     
    -  override def initiateAggregate: Unit = {
    -    sum = 0
    -    count = 0
    +  override def intermediateDataType: Array[SqlTypeName] = {
    +    intermediateType
       }
     
    +  override def supportPartial: Boolean = true
     }
     
     class ByteAvgAggregate extends IntegralAvgAggregate[Byte] {
    -
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Byte])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Byte]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Byte = {
    -    (sum / count).toByte
    +  override def evaluate(buffer: Row): Byte = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toByte
       }
     }
     
     class ShortAvgAggregate extends IntegralAvgAggregate[Short] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Short])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Short]
    +    partial.setField(aggOffsetInRow, input.toLong)
    +    partial.setField(aggOffsetInRow + 1, 1L)
       }
     
    -  override def getAggregated(): Short = {
    -    (sum / count).toShort
    +  override def evaluate(buffer: Row): Short = {
    +    val bufferSum = buffer.productElement(aggOffsetInRow).asInstanceOf[Long]
    +    val bufferCount = buffer.productElement(aggOffsetInRow + 1).asInstanceOf[Long]
    +    (bufferSum / bufferCount).toShort
       }
     }
     
     class IntAvgAggregate extends IntegralAvgAggregate[Int] {
     
    -  override def aggregate(value: Any): Unit = {
    -    count += 1
    -    sum = LongMath.checkedAdd(sum, value.asInstanceOf[Int])
    +  override def prepare(value: Any, partial: Row): Unit = {
    +    val input = value.asInstanceOf[Int]
    +    partial.setField(aggOffsetInRow, input.toLong)
    --- End diff --
    
    The previous version used a `BigInteger` to compute the sum of a `Long` average. Can you change it back or explain why you would like to change that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-191621309
  
    Hi, @fhueske , i found 2 test failure due to the input data of `AggregateMapFunction` is `Tuple` instead of `Row` while table config is `EFFICIENT`. I thought `Tuple` as other types are only supported as data source, inside SQL operators only support `Row`. Why this is introduced? more efficient? I thought the road map would go to Row which store binary data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54564731
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala ---
    @@ -17,26 +17,77 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
    +
     /**
    - * Represents a SQL aggregate function. The user should first initialize the aggregate, then feed it
    - * with grouped aggregate field values, and finally get the aggregated value.
    - * @tparam T the output type
    + * The interface for all Flink aggregate functions, which expressed in terms of initiate(),
    + * prepare(), merge() and evaluate(). The aggregate functions would be executed in 2 phases:
    + * -- In Map phase, use prepare() to transform aggregate field value into intermediate
    + * aggregate value.
    + * -- In GroupReduce phase, use merge() to merge grouped intermediate aggregate values
    + * into aggregate buffer. Then use evaluate() to calculate the final aggregated value.
    + * For associative decomposable aggregate functions, they support partial aggregate. To optimize
    + * the performance, a Combine phase would be added between Map phase and GroupReduce phase,
    + * -- In Combine phase, use merge() to merge sub-grouped intermediate aggregate values
    + * into aggregate buffer.
    + *
    + * The intermediate aggregate value is stored inside Row, aggOffsetInRow is used as the start
    + * field index in Row, so different aggregate functions could share the same Row as intermediate
    + * aggregate value/aggregate buffer, as their aggregate values could be stored in distinct fields
    + * of Row with no conflict. The intermediate aggregate value is required to be a sequence of JVM
    + * primitives, and Flink use intermediateDataType() to get its data types in SQL side.
    + *
    + * @tparam T Aggregated value type.
      */
     trait Aggregate[T] extends Serializable {
    +
    +  protected var aggOffsetInRow: Int = _
    +
       /**
    -   * Initialize the aggregate state.
    +   * Initiate the intermediate aggregate value in Row.
    +   * @param intermediate
        */
    -  def initiateAggregate
    +  def initiate(intermediate: Row): Unit
    --- End diff --
    
    I did not find a usage of the `initiate()` method. However, I think we can use it to initialize the buffer to make the `reduce()` and `combine()` methods safe wrt. object reuse.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54548253
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)])
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private final val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate data and output data.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    --- End diff --
    
    This may lead to wrong results if object reuse mode is enabled because `buffer` may change its value when `next()` is called. See PR #1721 which improves the current object reuse documentation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54553557
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.plan.nodes.logical.FlinkAggregate
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +object AggregateUtil {
    +
    +  /**
    +   * Create Flink operator functions for aggregates. It includes 2 implementations of Flink 
    +   * operator functions:
    +   * [[org.apache.flink.api.common.functions.MapFunction]] and 
    +   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate,
    +   * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). 
    +   * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the 
    +   * intermediate aggregate values of all aggregate function, it's stored in Row by the following
    +   * format:
    +   *
    +   * {{{
    +   *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    +   *                             |                          |
    +   *                             v                          v
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *                                              ^
    +   *                                              |
    +   *                               sum(y) aggOffsetInRow = 4
    +   * }}}
    +   *
    +   */
    +  def createOperatorFunctionsForAggregates(aggregate: FlinkAggregate,
    +      inputType: RelDataType, outputType: RelDataType,
    +      groupings: Array[Int]): AggregateResult = {
    +
    +    val aggregateCalls: Seq[AggregateCall] = aggregate.getAggCallList
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +
    +    transformToAggregateFunctions(aggregateCalls, aggFieldIndexes,
    +      aggregates, inputType, groupings.length)
    +
    +    val mapFunction = new AggregateMapFunction(aggregates, aggFieldIndexes, groupings)
    +
    +    val bufferDataType: RelRecordType =
    +      createAggregateBufferDataType(groupings, aggregates, inputType)
    +
    +    // the mapping relation between field index of intermediate aggregate Row and output Row.
    +    var groupingOffsetMapping = ArrayBuffer[(Int, Int)]()
    +
    +    // the mapping relation between aggregate function index in list and its corresponding
    +    // field index in output Row.
    +    var aggOffsetMapping = ArrayBuffer[(Int, Int)]()
    +
    +
    +    outputType.getFieldList.zipWithIndex.foreach {
    +      case (fieldType: RelDataTypeField, outputIndex: Int) =>
    +
    +        val aggregateIndex: Int = getMatchedAggregateIndex(aggregate, fieldType)
    +        if (aggregateIndex != -1) {
    +          aggOffsetMapping += ((outputIndex, aggregateIndex))
    +        } else {
    +          val groupKeyIndex: Int = getMatchedFieldIndex(inputType, fieldType, groupings)
    +          if (groupKeyIndex != -1) {
    +            groupingOffsetMapping += ((outputIndex, groupKeyIndex))
    +          } else {
    +            throw new PlanGenException("Could not find output field in input data type " +
    +                "or aggregate function.")
    +          }
    +        }
    +    }
    +
    +    val allPartialAggregate = aggregates.map(_.supportPartial).foldLeft(true)(_ && _)
    +
    +    val reduceGroupFunction =
    +      if (allPartialAggregate) {
    +        new AggregateReduceCombineFunction(aggregates, groupingOffsetMapping.toArray,
    +          aggOffsetMapping.toArray)
    +      } else {
    +        new AggregateReduceGroupFunction(aggregates, groupingOffsetMapping.toArray,
    +          aggOffsetMapping.toArray)
    +      }
    +
    +    new AggregateResult(mapFunction, reduceGroupFunction, bufferDataType)
    +  }
    +
    +  private def transformToAggregateFunctions(
    +      aggregateCalls: Seq[AggregateCall],
    +      aggFieldIndexes: Array[Int],
    +      aggregates: Array[Aggregate[_ <: Any]],
    +      inputType: RelDataType,
    +      groupKeysCount: Int): Unit = {
    +
    +    // set the start offset of aggregate buffer value to group keys' length, 
    +    // as all the group keys would be moved to the start fields of intermediate
    +    // aggregate data.
    +    var aggOffset = groupKeysCount
    +
    +    // create aggregate function instances by function type and aggregate field data type.
    +    aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
    +      val argList: util.List[Integer] = aggregateCall.getArgList
    +      if (argList.isEmpty) {
    +        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +          aggFieldIndexes(index) = 0
    +        } else {
    +          throw new PlanGenException("Aggregate fields should not be empty.")
    +        }
    +      } else {
    +        if (argList.size() > 1) {
    +          throw new PlanGenException("Currently, do not support aggregate on multi fields.")
    +        }
    +        aggFieldIndexes(index) = argList.get(0)
    +      }
    +      val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
    +      aggregateCall.getAggregation match {
    +        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
    +          sqlTypeName match {
    +            case TINYINT =>
    +              aggregates(index) = new ByteSumAggregate
    +            case SMALLINT =>
    +              aggregates(index) = new ShortSumAggregate
    +            case INTEGER =>
    +              aggregates(index) = new IntSumAggregate
    +            case BIGINT =>
    +              aggregates(index) = new LongSumAggregate
    +            case FLOAT =>
    +              aggregates(index) = new FloatSumAggregate
    +            case DOUBLE =>
    +              aggregates(index) = new DoubleSumAggregate
    +            case sqlType: SqlTypeName =>
    +              throw new PlanGenException("Sum aggregate does no support type:" + sqlType)
    +          }
    +          setAggregateDataOffset(index)
    +        }
    +        case _: SqlAvgAggFunction => {
    +          sqlTypeName match {
    +            case TINYINT =>
    +              aggregates(index) = new ByteAvgAggregate
    +            case SMALLINT =>
    +              aggregates(index) = new ShortAvgAggregate
    +            case INTEGER =>
    +              aggregates(index) = new IntAvgAggregate
    +            case BIGINT =>
    +              aggregates(index) = new LongAvgAggregate
    +            case FLOAT =>
    +              aggregates(index) = new FloatAvgAggregate
    +            case DOUBLE =>
    +              aggregates(index) = new DoubleAvgAggregate
    +            case sqlType: SqlTypeName =>
    +              throw new PlanGenException("Avg aggregate does no support type:" + sqlType)
    +          }
    +          setAggregateDataOffset(index)
    +        }
    +        case sqlMinMaxFunction: SqlMinMaxAggFunction => {
    +          if (sqlMinMaxFunction.isMin) {
    +            sqlTypeName match {
    +              case TINYINT =>
    +                aggregates(index) = new ByteMinAggregate
    +              case SMALLINT =>
    +                aggregates(index) = new ShortMinAggregate
    +              case INTEGER =>
    +                aggregates(index) = new IntMinAggregate
    +              case BIGINT =>
    +                aggregates(index) = new LongMinAggregate
    +              case FLOAT =>
    +                aggregates(index) = new FloatMinAggregate
    +              case DOUBLE =>
    +                aggregates(index) = new DoubleMinAggregate
    +              case sqlType: SqlTypeName =>
    +                throw new PlanGenException("Min aggregate does no support type:" + sqlType)
    +            }
    +          } else {
    +            sqlTypeName match {
    +              case TINYINT =>
    +                aggregates(index) = new ByteMaxAggregate
    +              case SMALLINT =>
    +                aggregates(index) = new ShortMaxAggregate
    +              case INTEGER =>
    +                aggregates(index) = new IntMaxAggregate
    +              case BIGINT =>
    +                aggregates(index) = new LongMaxAggregate
    +              case FLOAT =>
    +                aggregates(index) = new FloatMaxAggregate
    +              case DOUBLE =>
    +                aggregates(index) = new DoubleMaxAggregate
    +              case sqlType: SqlTypeName =>
    +                throw new PlanGenException("Max aggregate does no support type:" + sqlType)
    +            }
    +          }
    +          setAggregateDataOffset(index)
    +        }
    +        case _: SqlCountAggFunction =>
    +          aggregates(index) = new CountAggregate
    +          setAggregateDataOffset(index)
    +        case unSupported: SqlAggFunction =>
    +          throw new PlanGenException("unsupported Function: " + unSupported.getName)
    +      }
    +    }
    +
    +    // set the aggregate intermediate data start index in Row, and update current value.
    +    def setAggregateDataOffset(index: Int): Unit = {
    +      aggregates(index).setAggOffsetInRow(aggOffset)
    +      aggOffset += aggregates(index).intermediateDataType.length
    +    }
    +  }
    +
    +  private def createAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[Aggregate[_]],
    +      inputType: RelDataType): RelRecordType = {
    +
    +    // get the field data types of group keys.
    +    val groupingTypes: Seq[RelDataTypeField] = groupings.map(inputType.getFieldList.get(_))
    +
    +    val aggPartialNameSuffix = "agg_buffer_"
    +    val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
    +
    +    // get all the aggregate buffer value data type by their SqlTypeName.
    +    val aggTypes: Seq[RelDataTypeField] =
    +      aggregates.flatMap(_.intermediateDataType).zipWithIndex.map {
    +        case (typeName: SqlTypeName, index: Int) =>
    +          val fieldDataType = factory.createSqlType(typeName)
    +          new RelDataTypeFieldImpl(aggPartialNameSuffix + index,
    +            groupings.length + index, fieldDataType)
    +      }
    +
    +    val allFieldTypes = groupingTypes ++: aggTypes
    +    val partialType = new RelRecordType(allFieldTypes.toList)
    +    partialType
    +  }
    +
    +  private def getMatchedAggregateIndex(aggregate: FlinkAggregate,
    +      outputFieldType: RelDataTypeField): Int = {
    +
    +    aggregate.getNamedAggCalls.zipWithIndex.foreach {
    +      case (namedAggCall, index) =>
    +        if (namedAggCall.getValue.equals(outputFieldType.getName) &&
    +            namedAggCall.getKey.getType.equals(outputFieldType.getType)) {
    +          return index
    +        }
    +    }
    +
    +    -1
    +  }
    +
    +  private def getMatchedFieldIndex(inputDatType: RelDataType,
    +      outputFieldType: RelDataTypeField, groupKeys: Array[Int]): Int = {
    --- End diff --
    
    Change return type to `Option[Int]` as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-191186632
  
    Hi, @fhueske , i've update the PR based on your comments, except object reusing in  `AggregateGroupCombineFunction`. I found `GroupReduceOperator` would transform it into `CombineToGroupCombineWrapper` which does not implement a rich user function. so we can not initiate the reusing object in `open()`. I think we need add a `RichCombineToGroupCombineWrapper` which used to wrap rich combinable GroupReduceFunction like 'AggregateGroupCombineFunction'. what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-191698778
  
    Thanks, @fhueske , i've updated the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54543822
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.functions.{MapFunction, MapPartitionFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.plan.TypeConverter
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +import org.apache.flink.api.table.{Row, TableConfig}
    +
    +import scala.collection.JavaConverters._
    +
    +/**
    +  * Flink RelNode which matches along with MapOperator.
    +  *
    +  */
    +class DataSetMap(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    input: RelNode,
    +    rowType: RelDataType,
    +    opName: String,
    +    func: MapFunction[Row, Row])
    --- End diff --
    
    Can we make the interface compatible with the other DataSetOperators and accept a function that returns the `MapFunction` instead of the `MapFunction` itself: 
    ```
    func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => MapFunction[Any, Any])
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54553521
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.plan.nodes.logical.FlinkAggregate
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +object AggregateUtil {
    +
    +  /**
    +   * Create Flink operator functions for aggregates. It includes 2 implementations of Flink 
    +   * operator functions:
    +   * [[org.apache.flink.api.common.functions.MapFunction]] and 
    +   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate,
    +   * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). 
    +   * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the 
    +   * intermediate aggregate values of all aggregate function, it's stored in Row by the following
    +   * format:
    +   *
    +   * {{{
    +   *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    +   *                             |                          |
    +   *                             v                          v
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *                                              ^
    +   *                                              |
    +   *                               sum(y) aggOffsetInRow = 4
    +   * }}}
    +   *
    +   */
    +  def createOperatorFunctionsForAggregates(aggregate: FlinkAggregate,
    +      inputType: RelDataType, outputType: RelDataType,
    +      groupings: Array[Int]): AggregateResult = {
    +
    +    val aggregateCalls: Seq[AggregateCall] = aggregate.getAggCallList
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +
    +    transformToAggregateFunctions(aggregateCalls, aggFieldIndexes,
    +      aggregates, inputType, groupings.length)
    +
    +    val mapFunction = new AggregateMapFunction(aggregates, aggFieldIndexes, groupings)
    +
    +    val bufferDataType: RelRecordType =
    +      createAggregateBufferDataType(groupings, aggregates, inputType)
    +
    +    // the mapping relation between field index of intermediate aggregate Row and output Row.
    +    var groupingOffsetMapping = ArrayBuffer[(Int, Int)]()
    +
    +    // the mapping relation between aggregate function index in list and its corresponding
    +    // field index in output Row.
    +    var aggOffsetMapping = ArrayBuffer[(Int, Int)]()
    +
    +
    +    outputType.getFieldList.zipWithIndex.foreach {
    +      case (fieldType: RelDataTypeField, outputIndex: Int) =>
    +
    +        val aggregateIndex: Int = getMatchedAggregateIndex(aggregate, fieldType)
    +        if (aggregateIndex != -1) {
    +          aggOffsetMapping += ((outputIndex, aggregateIndex))
    +        } else {
    +          val groupKeyIndex: Int = getMatchedFieldIndex(inputType, fieldType, groupings)
    +          if (groupKeyIndex != -1) {
    +            groupingOffsetMapping += ((outputIndex, groupKeyIndex))
    +          } else {
    +            throw new PlanGenException("Could not find output field in input data type " +
    +                "or aggregate function.")
    +          }
    +        }
    +    }
    +
    +    val allPartialAggregate = aggregates.map(_.supportPartial).foldLeft(true)(_ && _)
    +
    +    val reduceGroupFunction =
    +      if (allPartialAggregate) {
    +        new AggregateReduceCombineFunction(aggregates, groupingOffsetMapping.toArray,
    +          aggOffsetMapping.toArray)
    +      } else {
    +        new AggregateReduceGroupFunction(aggregates, groupingOffsetMapping.toArray,
    +          aggOffsetMapping.toArray)
    +      }
    +
    +    new AggregateResult(mapFunction, reduceGroupFunction, bufferDataType)
    +  }
    +
    +  private def transformToAggregateFunctions(
    +      aggregateCalls: Seq[AggregateCall],
    +      aggFieldIndexes: Array[Int],
    +      aggregates: Array[Aggregate[_ <: Any]],
    +      inputType: RelDataType,
    +      groupKeysCount: Int): Unit = {
    +
    +    // set the start offset of aggregate buffer value to group keys' length, 
    +    // as all the group keys would be moved to the start fields of intermediate
    +    // aggregate data.
    +    var aggOffset = groupKeysCount
    +
    +    // create aggregate function instances by function type and aggregate field data type.
    +    aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
    +      val argList: util.List[Integer] = aggregateCall.getArgList
    +      if (argList.isEmpty) {
    +        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +          aggFieldIndexes(index) = 0
    +        } else {
    +          throw new PlanGenException("Aggregate fields should not be empty.")
    +        }
    +      } else {
    +        if (argList.size() > 1) {
    +          throw new PlanGenException("Currently, do not support aggregate on multi fields.")
    +        }
    +        aggFieldIndexes(index) = argList.get(0)
    +      }
    +      val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
    +      aggregateCall.getAggregation match {
    +        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
    +          sqlTypeName match {
    +            case TINYINT =>
    +              aggregates(index) = new ByteSumAggregate
    +            case SMALLINT =>
    +              aggregates(index) = new ShortSumAggregate
    +            case INTEGER =>
    +              aggregates(index) = new IntSumAggregate
    +            case BIGINT =>
    +              aggregates(index) = new LongSumAggregate
    +            case FLOAT =>
    +              aggregates(index) = new FloatSumAggregate
    +            case DOUBLE =>
    +              aggregates(index) = new DoubleSumAggregate
    +            case sqlType: SqlTypeName =>
    +              throw new PlanGenException("Sum aggregate does no support type:" + sqlType)
    +          }
    +          setAggregateDataOffset(index)
    +        }
    +        case _: SqlAvgAggFunction => {
    +          sqlTypeName match {
    +            case TINYINT =>
    +              aggregates(index) = new ByteAvgAggregate
    +            case SMALLINT =>
    +              aggregates(index) = new ShortAvgAggregate
    +            case INTEGER =>
    +              aggregates(index) = new IntAvgAggregate
    +            case BIGINT =>
    +              aggregates(index) = new LongAvgAggregate
    +            case FLOAT =>
    +              aggregates(index) = new FloatAvgAggregate
    +            case DOUBLE =>
    +              aggregates(index) = new DoubleAvgAggregate
    +            case sqlType: SqlTypeName =>
    +              throw new PlanGenException("Avg aggregate does no support type:" + sqlType)
    +          }
    +          setAggregateDataOffset(index)
    +        }
    +        case sqlMinMaxFunction: SqlMinMaxAggFunction => {
    +          if (sqlMinMaxFunction.isMin) {
    +            sqlTypeName match {
    +              case TINYINT =>
    +                aggregates(index) = new ByteMinAggregate
    +              case SMALLINT =>
    +                aggregates(index) = new ShortMinAggregate
    +              case INTEGER =>
    +                aggregates(index) = new IntMinAggregate
    +              case BIGINT =>
    +                aggregates(index) = new LongMinAggregate
    +              case FLOAT =>
    +                aggregates(index) = new FloatMinAggregate
    +              case DOUBLE =>
    +                aggregates(index) = new DoubleMinAggregate
    +              case sqlType: SqlTypeName =>
    +                throw new PlanGenException("Min aggregate does no support type:" + sqlType)
    +            }
    +          } else {
    +            sqlTypeName match {
    +              case TINYINT =>
    +                aggregates(index) = new ByteMaxAggregate
    +              case SMALLINT =>
    +                aggregates(index) = new ShortMaxAggregate
    +              case INTEGER =>
    +                aggregates(index) = new IntMaxAggregate
    +              case BIGINT =>
    +                aggregates(index) = new LongMaxAggregate
    +              case FLOAT =>
    +                aggregates(index) = new FloatMaxAggregate
    +              case DOUBLE =>
    +                aggregates(index) = new DoubleMaxAggregate
    +              case sqlType: SqlTypeName =>
    +                throw new PlanGenException("Max aggregate does no support type:" + sqlType)
    +            }
    +          }
    +          setAggregateDataOffset(index)
    +        }
    +        case _: SqlCountAggFunction =>
    +          aggregates(index) = new CountAggregate
    +          setAggregateDataOffset(index)
    +        case unSupported: SqlAggFunction =>
    +          throw new PlanGenException("unsupported Function: " + unSupported.getName)
    +      }
    +    }
    +
    +    // set the aggregate intermediate data start index in Row, and update current value.
    +    def setAggregateDataOffset(index: Int): Unit = {
    +      aggregates(index).setAggOffsetInRow(aggOffset)
    +      aggOffset += aggregates(index).intermediateDataType.length
    +    }
    +  }
    +
    +  private def createAggregateBufferDataType(
    +      groupings: Array[Int],
    +      aggregates: Array[Aggregate[_]],
    +      inputType: RelDataType): RelRecordType = {
    +
    +    // get the field data types of group keys.
    +    val groupingTypes: Seq[RelDataTypeField] = groupings.map(inputType.getFieldList.get(_))
    +
    +    val aggPartialNameSuffix = "agg_buffer_"
    +    val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
    +
    +    // get all the aggregate buffer value data type by their SqlTypeName.
    +    val aggTypes: Seq[RelDataTypeField] =
    +      aggregates.flatMap(_.intermediateDataType).zipWithIndex.map {
    +        case (typeName: SqlTypeName, index: Int) =>
    +          val fieldDataType = factory.createSqlType(typeName)
    +          new RelDataTypeFieldImpl(aggPartialNameSuffix + index,
    +            groupings.length + index, fieldDataType)
    +      }
    +
    +    val allFieldTypes = groupingTypes ++: aggTypes
    +    val partialType = new RelRecordType(allFieldTypes.toList)
    +    partialType
    +  }
    +
    +  private def getMatchedAggregateIndex(aggregate: FlinkAggregate,
    +      outputFieldType: RelDataTypeField): Int = {
    --- End diff --
    
    Change the return type to `Option[Int]` and return `None` instead of `-1`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55013155
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -0,0 +1,329 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +import org.apache.flink.api.table.{Row, TableConfig}
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +object AggregateUtil {
    +
    +  type CalcitePair[T, R] = org.apache.calcite.util.Pair[T, R]
    +  type JavaList[T] = java.util.List[T]
    +
    +  /**
    +   * Create Flink operator functions for aggregates. It includes 2 implementations of Flink 
    +   * operator functions:
    +   * [[org.apache.flink.api.common.functions.MapFunction]] and 
    +   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate,
    +   * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). 
    +   * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the 
    +   * intermediate aggregate values of all aggregate function, it's stored in Row by the following
    +   * format:
    +   *
    +   * {{{
    +   *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    +   *                             |                          |
    +   *                             v                          v
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *                                              ^
    +   *                                              |
    +   *                               sum(y) aggOffsetInRow = 4
    +   * }}}
    +   *
    +   */
    +  def createOperatorFunctionsForAggregates(namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      inputType: RelDataType, outputType: RelDataType,
    +      groupings: Array[Int]): AggregateResult = {
    +
    +    val aggregateFunctionsAndFieldIndexes =
    +      transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length)
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
    +    val aggregates = aggregateFunctionsAndFieldIndexes._2
    +
    +    val mapFunction = (
    +        config: TableConfig,
    +        inputType: TypeInformation[Any],
    +        returnType: TypeInformation[Any]) => {
    +
    +      val aggregateMapFunction = new AggregateMapFunction[Row, Row](
    +        aggregates, aggFieldIndexes, groupings, returnType.asInstanceOf[RowTypeInfo])
    +
    +      aggregateMapFunction.asInstanceOf[MapFunction[Any, Any]]
    +    }
    +
    +    val bufferDataType: RelRecordType =
    +      createAggregateBufferDataType(groupings, aggregates, inputType)
    +
    +    // the mapping relation between field index of intermediate aggregate Row and output Row.
    +    val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
    +
    +    // the mapping relation between aggregate function index in list and its corresponding
    +    // field index in output Row.
    +    val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
    +
    +    if (groupingOffsetMapping.length != groupings.length ||
    +        aggOffsetMapping.length != namedAggregates.length) {
    +      throw new PlanGenException("Could not find output field in input data type " +
    +          "or aggregate functions.")
    +    }
    +
    +    val allPartialAggregate = aggregates.map(_.supportPartial).reduce(_ && _)
    +
    +    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
    +
    +    val reduceGroupFunction =
    +      if (allPartialAggregate) {
    +        (config: TableConfig, inputType: TypeInformation[Row], returnType: TypeInformation[Row]) =>
    +          new AggregateReduceCombineFunction(aggregates, groupingOffsetMapping,
    +            aggOffsetMapping, intermediateRowArity)
    +      } else {
    +        (config: TableConfig, inputType: TypeInformation[Row], returnType: TypeInformation[Row]) =>
    +          new AggregateReduceGroupFunction(aggregates, groupingOffsetMapping,
    +            aggOffsetMapping, intermediateRowArity)
    +      }
    +
    +    new AggregateResult(mapFunction, reduceGroupFunction, bufferDataType)
    +  }
    +
    +  private def transformToAggregateFunctions(
    +      aggregateCalls: Seq[AggregateCall],
    +      inputType: RelDataType,
    +      groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = {
    +
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +
    +    // set the start offset of aggregate buffer value to group keys' length, 
    +    // as all the group keys would be moved to the start fields of intermediate
    +    // aggregate data.
    +    var aggOffset = groupKeysCount
    +
    +    // create aggregate function instances by function type and aggregate field data type.
    +    aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
    +      val argList: util.List[Integer] = aggregateCall.getArgList
    +      if (argList.isEmpty) {
    +        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +          aggFieldIndexes(index) = 0
    +        } else {
    +          throw new PlanGenException("Aggregate fields should not be empty.")
    +        }
    +      } else {
    +        if (argList.size() > 1) {
    +          throw new PlanGenException("Currently, do not support aggregate on multi fields.")
    +        }
    +        aggFieldIndexes(index) = argList.get(0)
    +      }
    +      val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
    +      aggregateCall.getAggregation match {
    +        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
    +          sqlTypeName match {
    +            case TINYINT =>
    +              aggregates(index) = new ByteSumAggregate
    +            case SMALLINT =>
    +              aggregates(index) = new ShortSumAggregate
    +            case INTEGER =>
    +              aggregates(index) = new IntSumAggregate
    +            case BIGINT =>
    +              aggregates(index) = new LongSumAggregate
    +            case FLOAT =>
    +              aggregates(index) = new FloatSumAggregate
    +            case DOUBLE =>
    +              aggregates(index) = new DoubleSumAggregate
    +            case sqlType: SqlTypeName =>
    +              throw new PlanGenException("Sum aggregate does no support type:" + sqlType)
    +          }
    +          setAggregateDataOffset(index)
    --- End diff --
    
    I think this can be moved out of the match statement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54564836
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala ---
    @@ -17,70 +17,96 @@
      */
     package org.apache.flink.api.table.runtime.aggregate
     
    -import scala.reflect.runtime.universe._
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.table.Row
     
    -abstract class MinAggregate[T: Numeric] extends Aggregate[T] {
    +abstract  class MinAggregate[T: Numeric] extends Aggregate[T]{
     
    -  var result: T = _
    -  val numericResult = implicitly[Numeric[T]]
    +  private val numeric = implicitly[Numeric[T]]
     
    -  override def aggregate(value: Any): Unit = {
    -    val input: T = value.asInstanceOf[T]
    +  /**
    +   * Initiate the partial aggregate value in Row.
    +   * @param partial
    +   */
    +  override def initiate(partial: Row): Unit = {
    +    partial.setField(aggOffsetInRow, numeric.zero)
    --- End diff --
    
    should initialize to the maximum value of the data type instead of `numeric.zero`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-192826952
  
    Add null value handling in aggregate functions, some code refactor.
    Besides, i found the previous Double/Float average function does not work in partial aggregate mode, change back to sum&count calculation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-191689616
  
    Hi @ChengXiangLi, you need copy this code snippet from `DataSetGroupReduce.translateToPlan()` to `DataSetMap.translateToPlan()` to enforce `Row` as input type:
    
    ```
        expectedType match {
          case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
             throw new PlanGenException("GroupReduce operations currently only support returning Rows.")
           case _ => // ok
         }
     
         val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
           config,
           // tell the input operator that this operator currently only supports Rows as input
           Some(TypeConverter.DEFAULT_ROW_TYPE))
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54547448
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceGroupFunction.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction, RichMapPartitionFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * It wraps the aggregate logic inside of 
    + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
    + *
    + * @param aggregates   The aggregate functions.
    + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row 
    + *                         and output Row.
    + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
    + *                         index in output Row.
    + */
    +class AggregateReduceGroupFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val groupKeysMapping: Array[(Int, Int)],
    +    private val aggregateMapping: Array[(Int, Int)])
    +    extends RichGroupReduceFunction[Row, Row] {
    +
    +  private final val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(groupKeysMapping)
    +  }
    +
    +  /**
    +   * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
    +   * calculate aggregated values output by aggregate buffer, and set them into output 
    +   * Row based on the mapping relation between intermediate aggregate data and output data.
    +   *
    +   * @param records  Grouped intermediate aggregate Rows iterator.
    +   * @param out The collector to hand results to.
    +   *
    +   */
    +  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
    +    
    +    val inputItr = records.iterator
    +    val buffer = inputItr.next
    +    while (inputItr.hasNext) {
    +      val next = inputItr.next
    +      aggregates.foreach(_.merge(next, buffer))
    +    }
    +
    +    val output: Row = new Row(finalRowLength)
    --- End diff --
    
    We can reuse the `output` object across `reduce()` calls


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r55013024
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -0,0 +1,329 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +import org.apache.flink.api.table.{Row, TableConfig}
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +object AggregateUtil {
    +
    +  type CalcitePair[T, R] = org.apache.calcite.util.Pair[T, R]
    +  type JavaList[T] = java.util.List[T]
    +
    +  /**
    +   * Create Flink operator functions for aggregates. It includes 2 implementations of Flink 
    +   * operator functions:
    +   * [[org.apache.flink.api.common.functions.MapFunction]] and 
    +   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate,
    +   * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). 
    +   * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the 
    +   * intermediate aggregate values of all aggregate function, it's stored in Row by the following
    +   * format:
    +   *
    +   * {{{
    +   *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    +   *                             |                          |
    +   *                             v                          v
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *                                              ^
    +   *                                              |
    +   *                               sum(y) aggOffsetInRow = 4
    +   * }}}
    +   *
    +   */
    +  def createOperatorFunctionsForAggregates(namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +      inputType: RelDataType, outputType: RelDataType,
    +      groupings: Array[Int]): AggregateResult = {
    +
    +    val aggregateFunctionsAndFieldIndexes =
    +      transformToAggregateFunctions(namedAggregates.map(_.getKey), inputType, groupings.length)
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
    +    val aggregates = aggregateFunctionsAndFieldIndexes._2
    +
    +    val mapFunction = (
    +        config: TableConfig,
    +        inputType: TypeInformation[Any],
    +        returnType: TypeInformation[Any]) => {
    +
    +      val aggregateMapFunction = new AggregateMapFunction[Row, Row](
    +        aggregates, aggFieldIndexes, groupings, returnType.asInstanceOf[RowTypeInfo])
    +
    +      aggregateMapFunction.asInstanceOf[MapFunction[Any, Any]]
    +    }
    +
    +    val bufferDataType: RelRecordType =
    +      createAggregateBufferDataType(groupings, aggregates, inputType)
    +
    +    // the mapping relation between field index of intermediate aggregate Row and output Row.
    +    val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, groupings)
    +
    +    // the mapping relation between aggregate function index in list and its corresponding
    +    // field index in output Row.
    +    val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
    +
    +    if (groupingOffsetMapping.length != groupings.length ||
    +        aggOffsetMapping.length != namedAggregates.length) {
    +      throw new PlanGenException("Could not find output field in input data type " +
    +          "or aggregate functions.")
    +    }
    +
    +    val allPartialAggregate = aggregates.map(_.supportPartial).reduce(_ && _)
    +
    +    val intermediateRowArity = groupings.length + aggregates.map(_.intermediateDataType.length).sum
    +
    +    val reduceGroupFunction =
    +      if (allPartialAggregate) {
    +        (config: TableConfig, inputType: TypeInformation[Row], returnType: TypeInformation[Row]) =>
    +          new AggregateReduceCombineFunction(aggregates, groupingOffsetMapping,
    +            aggOffsetMapping, intermediateRowArity)
    +      } else {
    +        (config: TableConfig, inputType: TypeInformation[Row], returnType: TypeInformation[Row]) =>
    +          new AggregateReduceGroupFunction(aggregates, groupingOffsetMapping,
    +            aggOffsetMapping, intermediateRowArity)
    +      }
    +
    +    new AggregateResult(mapFunction, reduceGroupFunction, bufferDataType)
    +  }
    +
    +  private def transformToAggregateFunctions(
    +      aggregateCalls: Seq[AggregateCall],
    +      inputType: RelDataType,
    +      groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = {
    +
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +
    +    // set the start offset of aggregate buffer value to group keys' length, 
    +    // as all the group keys would be moved to the start fields of intermediate
    +    // aggregate data.
    +    var aggOffset = groupKeysCount
    +
    +    // create aggregate function instances by function type and aggregate field data type.
    +    aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
    +      val argList: util.List[Integer] = aggregateCall.getArgList
    +      if (argList.isEmpty) {
    +        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +          aggFieldIndexes(index) = 0
    +        } else {
    +          throw new PlanGenException("Aggregate fields should not be empty.")
    +        }
    +      } else {
    +        if (argList.size() > 1) {
    +          throw new PlanGenException("Currently, do not support aggregate on multi fields.")
    +        }
    +        aggFieldIndexes(index) = argList.get(0)
    +      }
    +      val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName
    +      aggregateCall.getAggregation match {
    +        case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
    +          sqlTypeName match {
    --- End diff --
    
    we can make the code more concise like this:
    ```
    aggregates(index) = sqlTypeName match {
      case TINYINT => new ByteSumAggregate
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1746#discussion_r54550819
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -0,0 +1,309 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.`type`._
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.`type`.{SqlTypeFactoryImpl, SqlTypeName}
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.{GroupReduceFunction, MapFunction}
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.plan.nodes.logical.FlinkAggregate
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable.ArrayBuffer
    +
    +object AggregateUtil {
    +
    +  /**
    +   * Create Flink operator functions for aggregates. It includes 2 implementations of Flink 
    +   * operator functions:
    +   * [[org.apache.flink.api.common.functions.MapFunction]] and 
    +   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's partial aggregate,
    +   * should also implement [[org.apache.flink.api.common.functions.CombineFunction]] as well). 
    +   * The output of [[org.apache.flink.api.common.functions.MapFunction]] contains the 
    +   * intermediate aggregate values of all aggregate function, it's stored in Row by the following
    +   * format:
    +   *
    +   * {{{
    +   *                   avg(x) aggOffsetInRow = 2          count(z) aggOffsetInRow = 5
    +   *                             |                          |
    +   *                             v                          v
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +   *        +---------+---------+--------+--------+--------+--------+
    +   *                                              ^
    +   *                                              |
    +   *                               sum(y) aggOffsetInRow = 4
    +   * }}}
    +   *
    +   */
    +  def createOperatorFunctionsForAggregates(aggregate: FlinkAggregate,
    +      inputType: RelDataType, outputType: RelDataType,
    +      groupings: Array[Int]): AggregateResult = {
    +
    +    val aggregateCalls: Seq[AggregateCall] = aggregate.getAggCallList
    +    // store the aggregate fields of each aggregate function, by the same order of aggregates.
    +    val aggFieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +
    +    transformToAggregateFunctions(aggregateCalls, aggFieldIndexes,
    --- End diff --
    
    It's better Scala style to define a function that returns values. If, as in this case, two values are returned, you can return a tuple.
    ```
    private def transformToAggregateFunctions(
      aggregateCalls: Seq[AggregateCall], 
      inputType: RelDataType, 
      groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = 
    {
      // ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3474] support partial aggregate

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1746#issuecomment-190723562
  
    Thanks for the PR @ChengXiangLi. 
    I made a few comments and suggestions.
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---