You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by vasia <gi...@git.apache.org> on 2016/03/09 15:46:51 UTC

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

GitHub user vasia opened a pull request:

    https://github.com/apache/flink/pull/1777

    [FLINK-3596] DataSet RelNode refactoring

    This PR refactors the current table API code so that
    
    - the intermediate flink relnode layer and the dataset rules are removed
    - code generation is moved from rules to DataSet nodes
    - unused DataSete nodes are removed
    - DataSetMap and DataSetReduce are merged into DataSetAggregate

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vasia/flink refactor-table

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1777.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1777
    
----
commit 249f7f5ecab79c6b5b214c37d7e2892f6efe3628
Author: vasia <va...@apache.org>
Date:   2016-03-08T16:29:32Z

    [FLINK-3596] DataSet RelNode refactoring
    
    - remove the intermediate flink relnode layer and the dataset rules
    
    - move code generation from rules to DataSet nodes
    
    - remove unused DataSete nodes
    
    - move code generation from join rule to DataSetJoin node
    
    - merge DataSetMap and DataSetReduce into  DataSetAggregate

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1777#issuecomment-194833766
  
    Thanks for the review @fhueske. I've addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1777#issuecomment-195093887
  
    Thanks for the update @vasia.
    Merged to `tableOnCalcite`. PR can be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1777#discussion_r55544962
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---
    @@ -69,37 +72,55 @@ class DataSetGroupReduce(
     
         expectedType match {
           case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
    -        throw new PlanGenException("GroupReduce operations currently only support returning Rows.")
    +        throw new PlanGenException("Aggregate operations currently only support returning Rows.")
           case _ => // ok
         }
     
    +    val groupingKeys = (0 until grouping.length).toArray
    +    // add grouping fields, position keys in the input, and input type
    +    val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
    +      inputType, rowType, grouping)
    +
         val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
           config,
           // tell the input operator that this operator currently only supports Rows as input
           Some(TypeConverter.DEFAULT_ROW_TYPE))
     
    +    val intermediateType = determineReturnType(
    +      aggregateResult.intermediateDataType,
    +      expectedType,
    +      config.getNullCheck,
    +      config.getEfficientTypeUsage)
    +
    +
         // get the output types
    -    val fieldsNames = rowType.getFieldNames
         val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
         .map(f => f.getType.getSqlTypeName)
         .map(n => TypeConverter.sqlTypeToTypeInfo(n))
         .toArray
     
         val rowTypeInfo = new RowTypeInfo(fieldTypes)
    +
    +    val mappedInput = inputDS.map(aggregateResult.mapFunc.apply(
    --- End diff --
    
    We could refactor the `AggregateUtil` such that it returns the `MapFunction` directly and not a function to generate the `MapFunction`. I think would not need to expose the `intermediateDataType` of the `aggregateResult`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1777#discussion_r55545944
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.functions.FlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.codegen.CodeGenerator
    +import org.apache.flink.api.table.plan.TypeConverter._
    +import org.apache.flink.api.table.plan.TypeConverter
    +import org.apache.flink.api.table.runtime.FlatMapRunner
    +import org.apache.flink.api.table.{TableException, Row, TableConfig}
    +import org.apache.calcite.rex.RexProgram
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which matches along with LogicalCalc.
    +  *
    +  */
    +class DataSetCalc(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    input: RelNode,
    +    rowType: RelDataType,
    +    calcProgram: RexProgram,
    +    calcRowType: RelDataType,
    +    opName: String,
    +    ruleDescription: String)
    +  extends SingleRel(cluster, traitSet, input)
    +  with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetCalc(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      rowType,
    +      calcProgram,
    +      calcRowType,
    +      opName,
    +      ruleDescription)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("name", opName)
    +  }
    +
    +  override def toString = opName
    +
    +  override def translateToPlan(config: TableConfig,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    expectedType match {
    --- End diff --
    
    I think this match expression should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

Posted by vasia <gi...@git.apache.org>.
Github user vasia closed the pull request at:

    https://github.com/apache/flink/pull/1777


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1777#discussion_r55546208
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala ---
    @@ -0,0 +1,165 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
    +import org.apache.flink.api.common.functions.FlatMapFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.codegen.CodeGenerator
    +import org.apache.flink.api.table.plan.TypeConverter._
    +import org.apache.flink.api.table.plan.TypeConverter
    +import org.apache.flink.api.table.runtime.FlatMapRunner
    +import org.apache.flink.api.table.{TableException, Row, TableConfig}
    +import org.apache.calcite.rex.RexProgram
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which matches along with LogicalCalc.
    +  *
    +  */
    +class DataSetCalc(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    input: RelNode,
    +    rowType: RelDataType,
    +    calcProgram: RexProgram,
    +    calcRowType: RelDataType,
    +    opName: String,
    +    ruleDescription: String)
    +  extends SingleRel(cluster, traitSet, input)
    +  with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetCalc(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      rowType,
    +      calcProgram,
    +      calcRowType,
    +      opName,
    +      ruleDescription)
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("name", opName)
    +  }
    +
    +  override def toString = opName
    +
    +  override def translateToPlan(config: TableConfig,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    expectedType match {
    +      case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
    +        throw new TableException("GroupReduce operations " +
    +            "currently only support returning Rows.")
    +      case _ => // ok
    +    }
    +
    +    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
    +      config,
    +      // tell the input operator that this operator currently only supports Rows as input
    +      Some(TypeConverter.DEFAULT_ROW_TYPE))
    --- End diff --
    
    Remove this parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1777#issuecomment-195241544
  
    Thanks for finalizing and merging :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3596] DataSet RelNode refactoring

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1777#discussion_r55545155
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---
    @@ -69,37 +72,55 @@ class DataSetGroupReduce(
     
         expectedType match {
           case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
    -        throw new PlanGenException("GroupReduce operations currently only support returning Rows.")
    +        throw new PlanGenException("Aggregate operations currently only support returning Rows.")
           case _ => // ok
         }
     
    +    val groupingKeys = (0 until grouping.length).toArray
    +    // add grouping fields, position keys in the input, and input type
    +    val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
    +      inputType, rowType, grouping)
    +
         val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
           config,
           // tell the input operator that this operator currently only supports Rows as input
           Some(TypeConverter.DEFAULT_ROW_TYPE))
     
    +    val intermediateType = determineReturnType(
    +      aggregateResult.intermediateDataType,
    +      expectedType,
    +      config.getNullCheck,
    +      config.getEfficientTypeUsage)
    +
    +
         // get the output types
    -    val fieldsNames = rowType.getFieldNames
         val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
         .map(f => f.getType.getSqlTypeName)
         .map(n => TypeConverter.sqlTypeToTypeInfo(n))
         .toArray
     
         val rowTypeInfo = new RowTypeInfo(fieldTypes)
    +
    +    val mappedInput = inputDS.map(aggregateResult.mapFunc.apply(
    +      config, inputDS.getType, intermediateType))
    +
         val groupReduceFunction =
    -      func.apply(config, inputDS.getType.asInstanceOf[RowTypeInfo], rowTypeInfo)
    +      aggregateResult.reduceGroupFunc.apply(
    --- End diff --
    
    `AggregateUtil` can directly return the `GroupReduceFunction`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---