You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ChengXiangLi <gi...@git.apache.org> on 2016/02/01 08:34:34 UTC

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

GitHub user ChengXiangLi opened a pull request:

    https://github.com/apache/flink/pull/1567

    [Flink-3226] Translate logical plan into physical plan

    This PR includes:
    * `DataSetAggregateRule` and `DataSetJoinRule` implementation.
    * Several common SQL aggregate function implementation, such as `SUM`, `COUNT`, `AVG`, `MIN` and `MAX`.
    * Some other minor refactors.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ChengXiangLi/flink flink-3226

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1567.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1567
    
----
commit 66e90743c9c56190d429b559c003eddef50fd70a
Author: chengxiang li <ch...@intel.com>
Date:   2016-02-01T07:18:14Z

    [Flink-3226] Translate logical plan FlinkRels into physical plan DataSetRels.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51726618
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class MaxAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +class TinyIntMaxAggregate extends MaxAggregate[Byte] {
    +  private var max = Byte.MaxValue
    --- End diff --
    
    Should be initialized with `Byte.MinValue`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51724939
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.plan.functions.AggregateFunction
    +
    +object AggregateFactory {
    +
    +  def createAggregateInstance(aggregateCalls: Seq[AggregateCall]):
    +    RichGroupReduceFunction[Any, Any] = {
    +
    +    val fieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +    aggregateCalls.zipWithIndex.map { case (aggregateCall, index) =>
    +      val sqlType = aggregateCall.getType
    +      val argList: util.List[Integer] = aggregateCall.getArgList
    +      // currently assume only aggregate on singleton field.
    +      if (argList.isEmpty) {
    +        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +          fieldIndexes(index) = 0
    +        } else {
    +          throw new PlanGenException("Aggregate fields should not be empty.")
    +        }
    +      } else {
    +        fieldIndexes(index) = argList.get(0);
    --- End diff --
    
    Ah, I guess it will be caught later when matching the aggregation type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51724615
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.plan.functions.AggregateFunction
    +
    +object AggregateFactory {
    +
    +  def createAggregateInstance(aggregateCalls: Seq[AggregateCall]):
    +    RichGroupReduceFunction[Any, Any] = {
    +
    +    val fieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +    aggregateCalls.zipWithIndex.map { case (aggregateCall, index) =>
    +      val sqlType = aggregateCall.getType
    +      val argList: util.List[Integer] = aggregateCall.getArgList
    +      // currently assume only aggregate on singleton field.
    +      if (argList.isEmpty) {
    +        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +          fieldIndexes(index) = 0
    +        } else {
    +          throw new PlanGenException("Aggregate fields should not be empty.")
    +        }
    +      } else {
    +        fieldIndexes(index) = argList.get(0);
    --- End diff --
    
    We should check that `argList.length == 1` and throw an exception otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51725520
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AvgAggregate.scala ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class AvgAggregate[T] extends Aggregate[T] {
    +
    +}
    +
    +// TinyInt average aggregate return Int as aggregated value.
    +class TinyIntAvgAggregate extends AvgAggregate[Int] {
    +  private var avgValue: Double = 0
    +  private var count: Int = 0
    +
    +  override def initiateAggregate: Unit = {
    +    avgValue = 0
    +    count = 0
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    count += 1
    +    val current = value.asInstanceOf[Byte]
    +    avgValue += (current - avgValue) / count
    +  }
    +
    +  override def getAggregated(): Int = {
    +    avgValue.toInt
    --- End diff --
    
    Shouldn't the return type be `Byte` as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi commented on the pull request:

    https://github.com/apache/flink/pull/1567#issuecomment-177831170
  
    @twalthr and @fhueske , please help to review. 
    For other rules, `DataSetScanRule` and `DataSetUnionRule` should be ready yet, and the missed Flink UDF in `DataSetCalcRule`/`DataSetProjectRule`/`DataSetFilterRule` would be generated by Timo's work on code generation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51722014
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input,
    + * feed to the aggregates, and collect the record with aggregated value.
    + *
    + * @param aggregates Sql aggregate functions.
    + * @param fields  The grouped keys' index.
    + */
    +class AggregateFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] {
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(fields)
    +    Preconditions.checkArgument(aggregates.size == fields.size)
    +
    +    aggregates.foreach(_.initiateAggregate)
    --- End diff --
    
    This needs to be done first in the `reduce` method. The `reduce` method is called several times for different groups.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51731066
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---
    @@ -39,6 +46,10 @@ class DataSetJoinRule
         val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
         val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
     
    +    val joinKeys = getJoinKeys(join)
    +
    +    // There would be a FlinkProject after FlinkJoin to handle the output fields afterward join,
    +    // so we do not need JoinFunction here by now.
    --- End diff --
    
    We need the `JoinFunction` to evaluate all non-equality join predicates. For instance if we have a join predicate such as `t1.a == t2.a AND t1.b < t2.b`, we would evaluate the `t1.a == t2.a` part by the joinKeys of the JoinOperator and the `t1.b < t2.b` as part of the `JoinFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51728841
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input,
    + * feed to the aggregates, and collect the record with aggregated value.
    + *
    + * @param aggregates Sql aggregate functions.
    + * @param fields  The grouped keys' index.
    + */
    +class AggregateFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] {
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(fields)
    +    Preconditions.checkArgument(aggregates.size == fields.size)
    +
    +    aggregates.foreach(_.initiateAggregate)
    +  }
    +
    +  override def reduce(records: Iterable[Any], out: Collector[Any]): Unit = {
    +    var currentValue: Any = null
    +
    +    // iterate all input records, feed to each aggregate.
    +    val aggregateAndField = aggregates.zip(fields)
    +    records.foreach {
    +      value =>
    +        currentValue = value
    +        aggregateAndField.foreach {
    +          case (aggregate, field) =>
    +            aggregate.aggregate(FunctionUtils.getFieldValue(value, field))
    +        }
    +    }
    +
    +    // reuse the latest record, and set all the aggregated values.
    --- End diff --
    
    The output type is not necessarily equal to the input type. For example if you have two aggregations on the same field (`"min(a) AS minA, max(a) AS maxA"`) you would not have `a` twice in the input. Hence we need to build a new output `Row` which does also include the grouping fields (these must be copied from the first or last element). We need to check which order is assumed by Calcite (I would assume something like `<gKey1, gKey2, ..., agg1, agg2, ...>`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by ChengXiangLi <gi...@git.apache.org>.
Github user ChengXiangLi closed the pull request at:

    https://github.com/apache/flink/pull/1567


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51723439
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input,
    + * feed to the aggregates, and collect the record with aggregated value.
    + *
    + * @param aggregates Sql aggregate functions.
    + * @param fields  The grouped keys' index.
    + */
    +class AggregateFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] {
    --- End diff --
    
    I think for the first version it is fine to implement for `Row` only.
    Later, we can add more efficient types and use code generation for aggregation functions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51726723
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class MaxAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +class TinyIntMaxAggregate extends MaxAggregate[Byte] {
    +  private var max = Byte.MaxValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Byte.MaxValue
    --- End diff --
    
    initialize with `MinValue`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51732524
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---
    @@ -46,12 +57,93 @@ class DataSetJoinRule
           convRight,
           rel.getRowType,
           join.toString,
    -      Array[Int](),
    -      Array[Int](),
    -      JoinType.INNER,
    +      joinKeys._1,
    +      joinKeys._2,
    +      sqlJoinTypeToFlinkJoinType(join.getJoinType),
           null,
           null)
       }
    +
    +  private def getJoinKeys(join: FlinkJoin): (Array[Int], Array[Int]) = {
    +    val joinKeys = ArrayBuffer.empty[(Int, Int)]
    +    parseJoinRexNode(join.getCondition.asInstanceOf[RexCall], joinKeys)
    +
    +    val joinedRowType= join.getRowType
    +    val leftRowType = join.getLeft.getRowType
    +    val rightRowType = join.getRight.getRowType
    +
    +    // The fetched join key index from Calcite is based on joined row type, we need
    +    // the join key index based on left/right input row type.
    +    val joinKeyPairs: ArrayBuffer[(Int, Int)] = joinKeys.map {
    +      case (first, second) =>
    +        var leftIndex = findIndexInSingleInput(first, joinedRowType, leftRowType)
    +        if (leftIndex == -1) {
    +          leftIndex = findIndexInSingleInput(second, joinedRowType, leftRowType)
    +          if (leftIndex == -1) {
    +            throw new PlanGenException("Invalid join condition, could not find " +
    +              joinedRowType.getFieldNames.get(first) + " and " +
    +              joinedRowType.getFieldNames.get(second) + " in left table")
    +          }
    +          val rightIndex = findIndexInSingleInput(first, joinedRowType, rightRowType)
    +          if (rightIndex == -1) {
    +            throw new PlanGenException("Invalid join condition could not find " +
    +              joinedRowType.getFieldNames.get(first) + " in right table")
    +          }
    +          (leftIndex, rightIndex)
    +        } else {
    +          val rightIndex = findIndexInSingleInput(second, joinedRowType, rightRowType)
    +          if (rightIndex == -1) {
    +            throw new PlanGenException("Invalid join condition could not find " +
    +              joinedRowType.getFieldNames.get(second) + " in right table")
    +          }
    +          (leftIndex, rightIndex)
    +        }
    +    }
    +
    +    val joinKeysPair = joinKeyPairs.unzip
    +
    +    (joinKeysPair._1.toArray, joinKeysPair._2.toArray)
    +  }
    +
    +  // Parse the join condition recursively, find all the join keys' index.
    +  private def parseJoinRexNode(condition: RexCall, joinKeys: ArrayBuffer[(Int, Int)]): Unit = {
    --- End diff --
    
    We should extract all conjunctive equality conditions and ignore all other conditions. If there are no conjunctive equality conditions, we should generate a data set cross. All non-equality conditions need to be evaluated in a join or cross function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51722041
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input,
    + * feed to the aggregates, and collect the record with aggregated value.
    + *
    + * @param aggregates Sql aggregate functions.
    + * @param fields  The grouped keys' index.
    + */
    +class AggregateFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] {
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(fields)
    +    Preconditions.checkArgument(aggregates.size == fields.size)
    +
    +    aggregates.foreach(_.initiateAggregate)
    +  }
    +
    +  override def reduce(records: Iterable[Any], out: Collector[Any]): Unit = {
    +    var currentValue: Any = null
    +
    +    // iterate all input records, feed to each aggregate.
    +    val aggregateAndField = aggregates.zip(fields)
    --- End diff --
    
    This can be done in the `open()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51727237
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/SumAggregate.scala ---
    @@ -0,0 +1,130 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class SumAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +// TinyInt sum aggregate return Int as aggregated value.
    --- End diff --
    
    the code converts the return value to `Byte`. The return type is defined in the Calcite `AggregateCall`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1567#issuecomment-179763839
  
    @ChengXiangLi told me he cannot continue with this PR for a couple of days. Since we agreed to finish FLINK-3221 as soon as possible, I think it is OK if somebody else picks up this PR and drives it forward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51725411
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/AggregateFactory.scala ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +import java.util
    +
    +import org.apache.calcite.rel.core.AggregateCall
    +import org.apache.calcite.sql.SqlAggFunction
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun._
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.PlanGenException
    +import org.apache.flink.api.table.plan.functions.AggregateFunction
    +
    +object AggregateFactory {
    +
    +  def createAggregateInstance(aggregateCalls: Seq[AggregateCall]):
    +    RichGroupReduceFunction[Any, Any] = {
    +
    +    val fieldIndexes = new Array[Int](aggregateCalls.size)
    +    val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
    +    aggregateCalls.zipWithIndex.map { case (aggregateCall, index) =>
    +      val sqlType = aggregateCall.getType
    --- End diff --
    
    According to the `AggregateCall` documentation, this is the result type and not the input type. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51723549
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/FunctionUtils.scala ---
    @@ -0,0 +1,37 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import org.apache.flink.api.table.Row
    +
    +object FunctionUtils {
    --- End diff --
    
    If we go for a `Row` only implementation, we do not need this class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1567#issuecomment-182959195
  
    Hey @ChengXiangLi,
    I merged your changes together with #1600. You can close this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51726774
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/aggregate/MaxAggregate.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions.aggregate
    +
    +abstract class MaxAggregate[T] extends Aggregate[T]{
    +
    +}
    +
    +class TinyIntMaxAggregate extends MaxAggregate[Byte] {
    +  private var max = Byte.MaxValue
    +
    +  override def initiateAggregate: Unit = {
    +    max = Byte.MaxValue
    +  }
    +
    +  override def aggregate(value: Any): Unit = {
    +    val current = value.asInstanceOf[Byte]
    +    if (current < max) {
    --- End diff --
    
    should be `if (current > max)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [Flink-3226] Translate logical plan into physi...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1567#discussion_r51737319
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/functions/AggregateFunction.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.plan.functions
    +
    +import java.lang.Iterable
    +
    +import com.google.common.base.Preconditions
    +import org.apache.flink.api.common.functions.RichGroupReduceFunction
    +import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConversions._
    +
    +/**
    + * A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input,
    + * feed to the aggregates, and collect the record with aggregated value.
    + *
    + * @param aggregates Sql aggregate functions.
    + * @param fields  The grouped keys' index.
    + */
    +class AggregateFunction(
    +    private val aggregates: Array[Aggregate[_ <: Any]],
    +    private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] {
    +
    +  override def open(config: Configuration) {
    +    Preconditions.checkNotNull(aggregates)
    +    Preconditions.checkNotNull(fields)
    +    Preconditions.checkArgument(aggregates.size == fields.size)
    +
    +    aggregates.foreach(_.initiateAggregate)
    +  }
    +
    +  override def reduce(records: Iterable[Any], out: Collector[Any]): Unit = {
    +    var currentValue: Any = null
    +
    +    // iterate all input records, feed to each aggregate.
    +    val aggregateAndField = aggregates.zip(fields)
    +    records.foreach {
    +      value =>
    +        currentValue = value
    +        aggregateAndField.foreach {
    --- End diff --
    
    I did a quick micro benchmark to compare the overhead of Scala's `foreach` with pattern matching against a simple `while` loop and found that `while` was much faster (6x). Not sure how this affects the overall performance, but since the reduce function is on the hot path, I would change this. Will do the same for my PR #1579.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---