You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by vasia <gi...@git.apache.org> on 2016/02/08 15:05:46 UTC

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

GitHub user vasia opened a pull request:

    https://github.com/apache/flink/pull/1600

    [FLINK-3226] Translate logical aggregations to physical

    This PR builds on #1567 and addresses @fhueske's comments on translating aggregations.
    Join translation is not part of this PR.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vasia/flink LogicalToPhysical

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1600.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1600
    
----
commit 6676aab520bd648c360f70a2047196d004ce1d31
Author: chengxiang li <ch...@intel.com>
Date:   2016-02-01T07:18:14Z

    [Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels.

commit cf41b740d32768185ec692e93754056ff6a16b59
Author: vasia <va...@apache.org>
Date:   2016-02-04T14:53:52Z

    [FLINK-3226] implement GroupReduce translation; enable tests for supported operations
    
    - compute average as sum and count for byte, short and int type to avoid rounding errors

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52305988
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import scala.reflect.runtime.universe._
    +
    +abstract class MinAggregate[T: Numeric] extends Aggregate[T] {
    +
    +  var result: T = _
    +
    +  override def aggregate(value: Any): Unit = {
    +    val input: T = value.asInstanceOf[T]
    +    val numericResult = implicitly[Numeric[T]]
    --- End diff --
    
    Can this be moved out of the `aggregate` method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1600#issuecomment-182817751
  
    Looks pretty good. Can you check the `@Ignore` annotations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52192307
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class MaxAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +class TinyIntMaxAggregate extends MaxAggregate[Byte] {
    +  private var max = Byte.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Byte.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Byte]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Byte = {
    +    max
    +  }
    +}
    +
    +class SmallIntMaxAggregate extends MaxAggregate[Short] {
    +  private var max = Short.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Short.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Short]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Short = {
    +    max
    +  }
    +}
    +
    +class IntMaxAggregate extends MaxAggregate[Int] {
    +  private var max = Int.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Int.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Int]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Int = {
    +    max
    +  }
    +}
    +
    +class LongMaxAggregate extends MaxAggregate[Long] {
    +  private var max = Long.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Int.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Long]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Long = {
    +    max
    +  }
    +}
    +
    +class FloatMaxAggregate extends MaxAggregate[Float] {
    +  private var max = Float.MinValue
    --- End diff --
    
    I missed to correct this, thnx!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52305236
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---
    @@ -39,6 +46,10 @@ class DataSetJoinRule
         val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
         val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
     
    +    val joinKeys = getJoinKeys(join)
    --- End diff --
    
    I would exclude the changes in this file from the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52175960
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala ---
    @@ -58,19 +58,23 @@ class FlinkAggregate(
         )
       }
     
    -  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
    -
    -    val origCosts = super.computeSelfCost(planner)
    -    val deltaCost = planner.getCostFactory.makeHugeCost()
    -
    -    // only prefer aggregations with transformed Avg
    -    aggCalls.toList.foldLeft[RelOptCost](origCosts){
    -      (c: RelOptCost, a: AggregateCall) =>
    -        if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
    -          c.plus(deltaCost)
    -        } else {
    -          c
    -        }
    -    }
    -  }
    +//
    +// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS
    +//   ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED.  
    +//  
    +//  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
    +//
    +//    val origCosts = super.computeSelfCost(planner)
    +//    val deltaCost = planner.getCostFactory.makeHugeCost()
    +//
    +//    // only prefer aggregations with transformed Avg
    +//    aggCalls.toList.foldLeft[RelOptCost](origCosts){
    +//      (c: RelOptCost, a: AggregateCall) =>
    +//        if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
    +//          c.plus(deltaCost)
    +//        } else {
    +//          c
    +//        }
    +//    }
    +//  }
    --- End diff --
    
    Commented code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52204752
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class SumAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +// TinyInt sum aggregate return Int as aggregated value.
    +class TinyIntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Byte]
    +  }
    +}
    +
    +// SmallInt sum aggregate return Int as aggregated value.
    +class SmallIntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Short]
    +  }
    +}
    +
    +// Int sum aggregate return Int as aggregated value.
    +class IntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Int]
    +  }
    +}
    +
    +// Long sum aggregate return Long as aggregated value.
    +class LongSumAggregate extends SumAggregate[Long] {
    +
    +  private var sumValue: Long = 0L
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Long]
    +  }
    +
    +  override def getAggregated(): Long = {
    +    sumValue
    +  }
    +}
    +
    +// Float sum aggregate return Float as aggregated value.
    +class FloatSumAggregate extends SumAggregate[Float] {
    +  private var sumValue: Float = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Float]
    +  }
    +
    +  override def getAggregated(): Float = {
    +    sumValue
    +  }
    +}
    +
    +// Double sum aggregate return Double as aggregated value.
    +class DoubleSumAggregate extends SumAggregate[Double] {
    +  private var sumValue: Double = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Double]
    +  }
    +
    +  override def getAggregated(): Double = {
    +    sumValue
    +  }
    +}
    --- End diff --
    
    I like @tillrohrmann's suggestion. Any reason why not use Scala's `Numeric` though?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52320399
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +abstract class AvgAggregate[T] extends Aggregate[T] {
    +
    +}
    +
    +// TinyInt average aggregate return Int as aggregated value.
    +class TinyIntAvgAggregate extends AvgAggregate[Byte] {
    +  private var sum: Long = 0
    +  private var count: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sum = 0
    +    count = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    count += 1
    +    sum += value.asInstanceOf[Byte]
    --- End diff --
    
    I know. However, aggregating on a `Double` and then calling `toInt` before returning the result led to rounding errors.
    e.g. there was a test where the sum was 231 of 21 records, i.e. the avg should be 11, but it was actually computed as 10.99999 and rounded down to 10. If you have any idea how to avoid such cases, please let me know!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52225586
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class SumAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +// TinyInt sum aggregate return Int as aggregated value.
    +class TinyIntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Byte]
    +  }
    +}
    +
    +// SmallInt sum aggregate return Int as aggregated value.
    +class SmallIntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Short]
    +  }
    +}
    +
    +// Int sum aggregate return Int as aggregated value.
    +class IntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Int]
    +  }
    +}
    +
    +// Long sum aggregate return Long as aggregated value.
    +class LongSumAggregate extends SumAggregate[Long] {
    +
    +  private var sumValue: Long = 0L
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Long]
    +  }
    +
    +  override def getAggregated(): Long = {
    +    sumValue
    +  }
    +}
    +
    +// Float sum aggregate return Float as aggregated value.
    +class FloatSumAggregate extends SumAggregate[Float] {
    +  private var sumValue: Float = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Float]
    +  }
    +
    +  override def getAggregated(): Float = {
    +    sumValue
    +  }
    +}
    +
    +// Double sum aggregate return Double as aggregated value.
    +class DoubleSumAggregate extends SumAggregate[Double] {
    +  private var sumValue: Double = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Double]
    +  }
    +
    +  override def getAggregated(): Double = {
    +    sumValue
    +  }
    +}
    --- End diff --
    
    No objections. I was just thinking about future improvements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52176834
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import java.lang.Iterable
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import scala.collection.JavaConversions._
    +import org.apache.flink.api.table.Row
    +
    +/**
    + * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the grouped data as input,
    + * feed to the aggregates, and collect the record with aggregated value.
    + *
    + * @param aggregates SQL aggregate functions.
    + * @param fields The grouped keys' indices in the input.
    + * @param groupingKeys The grouping keys' positions.
    + */
    +class AggregateFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val fields: Array[Int],
    +    private val groupingKeys: Array[Int]) extends RichGroupReduceFunction[Row, Row] {
    --- End diff --
    
    I would move all runtime related function to `org.apache.flink.table.runtime`. IMO `plan` is not the right place for `functions`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1600#issuecomment-181794348
  
    Thanks for the feedback @tillrohrmann, @twalthr!
    I've moved the classes to `org.apache.flink.api.table.runtime` and tried to shorten the aggregates code using Numerics. I only left `AvgAggregate` as is, because integer average and float/double average are computed differently. We can always replace it with code generation later as @twalthr suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52175572
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class MaxAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +class TinyIntMaxAggregate extends MaxAggregate[Byte] {
    +  private var max = Byte.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Byte.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Byte]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Byte = {
    +    max
    +  }
    +}
    +
    +class SmallIntMaxAggregate extends MaxAggregate[Short] {
    +  private var max = Short.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Short.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Short]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Short = {
    +    max
    +  }
    +}
    +
    +class IntMaxAggregate extends MaxAggregate[Int] {
    +  private var max = Int.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Int.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Int]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Int = {
    +    max
    +  }
    +}
    +
    +class LongMaxAggregate extends MaxAggregate[Long] {
    +  private var max = Long.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Int.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Long]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Long = {
    +    max
    +  }
    +}
    +
    +class FloatMaxAggregate extends MaxAggregate[Float] {
    +  private var max = Float.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Int.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Float]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Float = {
    +    max
    +  }
    +}
    +
    +class DoubleMaxAggregate extends MaxAggregate[Double] {
    +  private var max = Double.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Int.MaxValue
    --- End diff --
    
    Why `Int.MaxValue` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52190250
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class SumAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +// TinyInt sum aggregate return Int as aggregated value.
    +class TinyIntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Byte]
    +  }
    +}
    +
    +// SmallInt sum aggregate return Int as aggregated value.
    +class SmallIntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Short]
    +  }
    +}
    +
    +// Int sum aggregate return Int as aggregated value.
    +class IntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Int]
    +  }
    +}
    +
    +// Long sum aggregate return Long as aggregated value.
    +class LongSumAggregate extends SumAggregate[Long] {
    +
    +  private var sumValue: Long = 0L
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Long]
    +  }
    +
    +  override def getAggregated(): Long = {
    +    sumValue
    +  }
    +}
    +
    +// Float sum aggregate return Float as aggregated value.
    +class FloatSumAggregate extends SumAggregate[Float] {
    +  private var sumValue: Float = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Float]
    +  }
    +
    +  override def getAggregated(): Float = {
    +    sumValue
    +  }
    +}
    +
    +// Double sum aggregate return Double as aggregated value.
    +class DoubleSumAggregate extends SumAggregate[Double] {
    +  private var sumValue: Double = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Double]
    +  }
    +
    +  override def getAggregated(): Double = {
    +    sumValue
    +  }
    +}
    --- End diff --
    
    We could also replace it later using code generation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia closed the pull request at:

    https://github.com/apache/flink/pull/1600


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52306203
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MinAggregate.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +import scala.reflect.runtime.universe._
    +
    +abstract class MinAggregate[T: Numeric] extends Aggregate[T] {
    +
    +  var result: T = _
    +
    +  override def aggregate(value: Any): Unit = {
    +    val input: T = value.asInstanceOf[T]
    +    val numericResult = implicitly[Numeric[T]]
    +
    +    result = numericResult.min(result, input)
    +  }
    +
    +  override def getAggregated(): T = {
    +    result
    +  }
    +
    +}
    +
    +// Numeric doesn't have max value
    +class TinyMinAggregate extends MinAggregate[Byte] {
    +
    +    override def initiateAggregate: Unit = {
    --- End diff --
    
    indention (also in the following classes).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52306136
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/MaxAggregate.scala ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +abstract class MaxAggregate[T: Numeric] extends Aggregate[T] {
    +
    +  var result: T = _
    +
    +  override def aggregate(value: Any): Unit = {
    +    val input: T = value.asInstanceOf[T]
    +    val numericResult = implicitly[Numeric[T]]
    --- End diff --
    
    Move out of `aggregate()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1600#issuecomment-182872698
  
    @fhueske I fixed the aggregation hashCode and enables the ignored tests. Let me know if it's OK now!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52192508
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala ---
    @@ -58,19 +58,23 @@ class FlinkAggregate(
         )
       }
     
    -  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
    -
    -    val origCosts = super.computeSelfCost(planner)
    -    val deltaCost = planner.getCostFactory.makeHugeCost()
    -
    -    // only prefer aggregations with transformed Avg
    -    aggCalls.toList.foldLeft[RelOptCost](origCosts){
    -      (c: RelOptCost, a: AggregateCall) =>
    -        if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
    -          c.plus(deltaCost)
    -        } else {
    -          c
    -        }
    -    }
    -  }
    +//
    +// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS
    +//   ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED.  
    +//  
    +//  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
    +//
    +//    val origCosts = super.computeSelfCost(planner)
    +//    val deltaCost = planner.getCostFactory.makeHugeCost()
    +//
    +//    // only prefer aggregations with transformed Avg
    +//    aggCalls.toList.foldLeft[RelOptCost](origCosts){
    +//      (c: RelOptCost, a: AggregateCall) =>
    +//        if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
    +//          c.plus(deltaCost)
    +//        } else {
    +//          c
    +//        }
    +//    }
    +//  }
    --- End diff --
    
    Yes, we decided to disable `AggregateReduceFunctionsRule` temporarily, thus the note on top.
    I could remove it completely if you think it'd be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52192844
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import java.lang.Iterable
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import scala.collection.JavaConversions._
    +import org.apache.flink.api.table.Row
    +
    +/**
    + * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the grouped data as input,
    + * feed to the aggregates, and collect the record with aggregated value.
    + *
    + * @param aggregates SQL aggregate functions.
    + * @param fields The grouped keys' indices in the input.
    + * @param groupingKeys The grouping keys' positions.
    + */
    +class AggregateFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val fields: Array[Int],
    +    private val groupingKeys: Array[Int]) extends RichGroupReduceFunction[Row, Row] {
    --- End diff --
    
    Sounds reasonable. Together with the expression functions or separately, e.g. in `org.apache.flink.table.runtime.functions`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52304432
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/logical/FlinkAggregate.scala ---
    @@ -58,19 +58,23 @@ class FlinkAggregate(
         )
       }
     
    -  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
    -
    -    val origCosts = super.computeSelfCost(planner)
    -    val deltaCost = planner.getCostFactory.makeHugeCost()
    -
    -    // only prefer aggregations with transformed Avg
    -    aggCalls.toList.foldLeft[RelOptCost](origCosts){
    -      (c: RelOptCost, a: AggregateCall) =>
    -        if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
    -          c.plus(deltaCost)
    -        } else {
    -          c
    -        }
    -    }
    -  }
    +//
    +// DO NOT ASSIGN HUGE COSTS TO PLANS WITH AVG AGGREGATIONS
    +//   ONLY NECESSARY IF AggregateReduceFunctionsRule IS ENABLED.  
    +//  
    +//  override def computeSelfCost (planner: RelOptPlanner): RelOptCost = {
    +//
    +//    val origCosts = super.computeSelfCost(planner)
    +//    val deltaCost = planner.getCostFactory.makeHugeCost()
    +//
    +//    // only prefer aggregations with transformed Avg
    +//    aggCalls.toList.foldLeft[RelOptCost](origCosts){
    +//      (c: RelOptCost, a: AggregateCall) =>
    +//        if (a.getAggregation.isInstanceOf[SqlAvgAggFunction]) {
    +//          c.plus(deltaCost)
    +//        } else {
    +//          c
    +//        }
    +//    }
    +//  }
    --- End diff --
    
    It's fine to drop the code. We can improve the aggregations later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52175496
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class MaxAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +class TinyIntMaxAggregate extends MaxAggregate[Byte] {
    +  private var max = Byte.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Byte.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Byte]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Byte = {
    +    max
    +  }
    +}
    +
    +class SmallIntMaxAggregate extends MaxAggregate[Short] {
    +  private var max = Short.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Short.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Short]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Short = {
    +    max
    +  }
    +}
    +
    +class IntMaxAggregate extends MaxAggregate[Int] {
    +  private var max = Int.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Int.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Int]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Int = {
    +    max
    +  }
    +}
    +
    +class LongMaxAggregate extends MaxAggregate[Long] {
    +  private var max = Long.MinValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Int.MinValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Long]
    +    if (current > max) {
    +      max = current
    +    }
    +  }
    +
    +  override def getAggregated(): Long = {
    +    max
    +  }
    +}
    +
    +class FloatMaxAggregate extends MaxAggregate[Float] {
    +  private var max = Float.MinValue
    --- End diff --
    
    Why `Float.MinValue` here and in `initiateAggregate` `Int.MinValue`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52225277
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import java.lang.Iterable
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +import scala.collection.JavaConversions._
    +import org.apache.flink.api.table.Row
    +
    +/**
    + * A wrapper Flink GroupReduceOperator UDF of aggregates. It takes the grouped data as input,
    + * feed to the aggregates, and collect the record with aggregated value.
    + *
    + * @param aggregates SQL aggregate functions.
    + * @param fields The grouped keys' indices in the input.
    + * @param groupingKeys The grouping keys' positions.
    + */
    +class AggregateFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val fields: Array[Int],
    +    private val groupingKeys: Array[Int]) extends RichGroupReduceFunction[Row, Row] {
    --- End diff --
    
    I would move everything there that is needed during runtime. `AggregateFunction` in `org.apache.flink.table.runtiime` and helper classes in sub-packages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52349128
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +abstract class AvgAggregate[T] extends Aggregate[T] {
    +
    +}
    +
    +// TinyInt average aggregate return Int as aggregated value.
    +class TinyIntAvgAggregate extends AvgAggregate[Byte] {
    +  private var sum: Long = 0
    +  private var count: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sum = 0
    +    count = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    count += 1
    +    sum += value.asInstanceOf[Byte]
    --- End diff --
    
    What about simply adding `0.5`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52181382
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class SumAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +// TinyInt sum aggregate return Int as aggregated value.
    +class TinyIntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Byte]
    +  }
    +}
    +
    +// SmallInt sum aggregate return Int as aggregated value.
    +class SmallIntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Short]
    +  }
    +}
    +
    +// Int sum aggregate return Int as aggregated value.
    +class IntSumAggregate extends SumAggregate[Int] {
    +
    +  private var sumValue: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +
    +  override def getAggregated(): Int = {
    +    sumValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Int]
    +  }
    +}
    +
    +// Long sum aggregate return Long as aggregated value.
    +class LongSumAggregate extends SumAggregate[Long] {
    +
    +  private var sumValue: Long = 0L
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Long]
    +  }
    +
    +  override def getAggregated(): Long = {
    +    sumValue
    +  }
    +}
    +
    +// Float sum aggregate return Float as aggregated value.
    +class FloatSumAggregate extends SumAggregate[Float] {
    +  private var sumValue: Float = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Float]
    +  }
    +
    +  override def getAggregated(): Float = {
    +    sumValue
    +  }
    +}
    +
    +// Double sum aggregate return Double as aggregated value.
    +class DoubleSumAggregate extends SumAggregate[Double] {
    +  private var sumValue: Double = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sumValue = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    sumValue += value.asInstanceOf[Double]
    +  }
    +
    +  override def getAggregated(): Double = {
    +    sumValue
    +  }
    +}
    --- End diff --
    
    This is a lot of duplicated code. Could we make it a bit less redundant by doing something similar to?
    
    ```
    class SAggregate[I: spire.math.Numeric, T: spire.math.Numeric] extends Aggregate[T] {
    
      var result: T = _
      /**
        * Initialize the aggregate state.
        */
      override def initiateAggregate: Unit = {
        result = implicitly[Numeric[T]].zero
      }
    
      /**
        * Feed the aggregate field value.
        *
        * @param value
        */
      override def aggregate(value: Any): Unit = {
        val input: I = value.asInstanceOf[I]
    
        val numericInput = implicitly[spire.math.Numeric[I]]
        val numericResult = implicitly[Numeric[T]]
    
        result = numericResult.plus(result, numericInput.toType[T](input))
      }
    
      /**
        * Return final aggregated value.
        *
        * @return
        */
      override def getAggregated(): T = {
        result
      }
    }
    ```
    
    The same holds true for the other aggregation functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52304994
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetAggregateRule.scala ---
    @@ -37,14 +39,24 @@ class DataSetAggregateRule
         val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE)
         val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE)
     
    -    new DataSetReduce(
    +    val grouping = agg.getGroupSet.asList().map {
    --- End diff --
    
    I think you can use `ImmutableBitSet.toArray` to directly generate an int[].


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1600#issuecomment-182392029
  
    I've rebased this one on top of `tableOnCalcite` which includes #1595.
    
    Regarding the average aggregate, I went for @tillrohrmann's suggestion of adding 0.5. That means that an integer average of 1 and 2 will now return 2. Note that in the previous implementation of the Table API this operation would return 1 because integer average was maintaining a pair of sum and count.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1600#discussion_r52306088
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala ---
    @@ -0,0 +1,145 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.runtime.aggregate
    +
    +abstract class AvgAggregate[T] extends Aggregate[T] {
    +
    +}
    +
    +// TinyInt average aggregate return Int as aggregated value.
    +class TinyIntAvgAggregate extends AvgAggregate[Byte] {
    +  private var sum: Long = 0
    +  private var count: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    sum = 0
    +    count = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    count += 1
    +    sum += value.asInstanceOf[Byte]
    --- End diff --
    
    the `aggregate` method was previously implemented as `avgValue += (current - avgValue) / count` to avoid overflow on `sum`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical aggregations to...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1600#issuecomment-182957377
  
    It's all green :D Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---