You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:49 UTC
[20/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
new file mode 100644
index 0000000..fc69493
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction}
+import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.runtime.FlatMapRunner
+import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.table.api.{TableConfig, TableException}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Join a user-defined table function
+ */
+trait FlinkCorrelate {
+
+ private[flink] def functionBody(
+ generator: CodeGenerator,
+ udtfTypeInfo: TypeInformation[Any],
+ rowType: RelDataType,
+ rexCall: RexCall,
+ condition: Option[RexNode],
+ config: TableConfig,
+ joinType: SemiJoinType,
+ expectedType: Option[TypeInformation[Any]]): String = {
+
+ val returnType = determineReturnType(
+ rowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
+
+ val call = generator.generateExpression(rexCall)
+ var body =
+ s"""
+ |${call.code}
+ |java.util.Iterator iter = ${call.resultTerm}.getRowsIterator();
+ """.stripMargin
+
+ if (joinType == SemiJoinType.INNER) {
+ // cross join
+ body +=
+ s"""
+ |if (!iter.hasNext()) {
+ | return;
+ |}
+ """.stripMargin
+ } else if (joinType == SemiJoinType.LEFT) {
+ // left outer join
+
+ // in case of left outer join and the returned row of table function is empty,
+ // fill all fields of row with null
+ val input2NullExprs = input2AccessExprs.map { x =>
+ GeneratedExpression(
+ primitiveDefaultValue(x.resultType),
+ ALWAYS_NULL,
+ NO_CODE,
+ x.resultType)
+ }
+ val outerResultExpr = generator.generateResultExpression(
+ input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
+ body +=
+ s"""
+ |if (!iter.hasNext()) {
+ | ${outerResultExpr.code}
+ | ${generator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+ | return;
+ |}
+ """.stripMargin
+ } else {
+ throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+ }
+
+ val crossResultExpr = generator.generateResultExpression(
+ input1AccessExprs ++ input2AccessExprs,
+ returnType,
+ rowType.getFieldNames.asScala)
+
+ val projection = if (condition.isEmpty) {
+ s"""
+ |${crossResultExpr.code}
+ |${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
+ """.stripMargin
+ } else {
+ val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
+ filterGenerator.input1Term = filterGenerator.input2Term
+ val filterCondition = filterGenerator.generateExpression(condition.get)
+ s"""
+ |${filterGenerator.reuseInputUnboxingCode()}
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${crossResultExpr.code}
+ | ${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
+ |}
+ |""".stripMargin
+ }
+
+ val outputTypeClass = udtfTypeInfo.getTypeClass.getCanonicalName
+ body +=
+ s"""
+ |while (iter.hasNext()) {
+ | $outputTypeClass ${generator.input2Term} = ($outputTypeClass) iter.next();
+ | $projection
+ |}
+ """.stripMargin
+ body
+ }
+
+ private[flink] def correlateMapFunction(
+ genFunction: GeneratedFunction[FlatMapFunction[Any, Any]])
+ : FlatMapRunner[Any, Any] = {
+
+ new FlatMapRunner[Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+ }
+
+ private[flink] def selectToString(rowType: RelDataType): String = {
+ rowType.getFieldNames.asScala.mkString(",")
+ }
+
+ private[flink] def correlateOpName(
+ rexCall: RexCall,
+ sqlFunction: TableSqlFunction,
+ rowType: RelDataType)
+ : String = {
+
+ s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
+ }
+
+ private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
+ val udtfName = sqlFunction.getName
+ val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
+ s"table($udtfName($operands))"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
new file mode 100644
index 0000000..c8211a2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.MapRunner
+
+import scala.collection.JavaConversions._
+
+trait FlinkRel {
+
+ private[flink] def getExpressionString(
+ expr: RexNode,
+ inFields: List[String],
+ localExprsTable: Option[List[RexNode]]): String = {
+
+ expr match {
+ case i: RexInputRef =>
+ inFields.get(i.getIndex)
+
+ case l: RexLiteral =>
+ l.toString
+
+ case l: RexLocalRef if localExprsTable.isEmpty =>
+ throw new IllegalArgumentException("Encountered RexLocalRef without " +
+ "local expression table")
+
+ case l: RexLocalRef =>
+ val lExpr = localExprsTable.get(l.getIndex)
+ getExpressionString(lExpr, inFields, localExprsTable)
+
+ case c: RexCall =>
+ val op = c.getOperator.toString
+ val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
+ s"$op(${ops.mkString(", ")})"
+
+ case fa: RexFieldAccess =>
+ val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable)
+ val field = fa.getField.getName
+ s"$referenceExpr.$field"
+
+ case _ =>
+ throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
+ }
+ }
+
+ private[flink] def getConversionMapper(
+ config: TableConfig,
+ nullableInput: Boolean,
+ inputType: TypeInformation[Any],
+ expectedType: TypeInformation[Any],
+ conversionOperatorName: String,
+ fieldNames: Seq[String],
+ inputPojoFieldMapping: Option[Array[Int]] = None)
+ : MapFunction[Any, Any] = {
+
+ val generator = new CodeGenerator(
+ config,
+ nullableInput,
+ inputType,
+ None,
+ inputPojoFieldMapping)
+ val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
+
+ val body =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ val genFunction = generator.generateFunction(
+ conversionOperatorName,
+ classOf[MapFunction[Any, Any]],
+ body,
+ expectedType)
+
+ new MapRunner[Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
new file mode 100644
index 0000000..252bb2e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.PojoTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+abstract class BatchScan(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable)
+ extends TableScan(cluster, traitSet, table)
+ with DataSetRel {
+
+ override def toString: String = {
+ s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+ val rowCnt = metadata.getRowCount(this)
+ planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
+ }
+
+ protected def convertToExpectedType(
+ input: DataSet[Any],
+ flinkTable: FlinkTable[_],
+ expectedType: Option[TypeInformation[Any]],
+ config: TableConfig): DataSet[Any] = {
+
+ val inputType = input.getType
+
+ expectedType match {
+
+ // special case:
+ // if efficient type usage is enabled and no expected type is set
+ // we can simply forward the DataSet to the next operator.
+ // however, we cannot forward PojoTypes as their fields don't have an order
+ case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
+ input
+
+ case _ =>
+ val determinedType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // conversion
+ if (determinedType != inputType) {
+
+ val mapFunc = getConversionMapper(
+ config,
+ nullableInput = false,
+ inputType,
+ determinedType,
+ "DataSetSourceConversion",
+ getRowType.getFieldNames,
+ Some(flinkTable.fieldIndexes))
+
+ val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+ input.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ input
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
new file mode 100644
index 0000000..09cb180
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.sources.BatchTableSource
+
+/** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
+class BatchTableSourceScan(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ val tableSource: BatchTableSource[_])
+ extends BatchScan(cluster, traitSet, table) {
+
+ override def deriveRowType() = {
+ val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+ val rowCnt = metadata.getRowCount(this)
+ planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
+ }
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new BatchTableSourceScan(
+ cluster,
+ traitSet,
+ getTable,
+ tableSource
+ )
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .item("fields", tableSource.getFieldsNames.mkString(", "))
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+ val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
+
+ convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
new file mode 100644
index 0000000..a5c42d9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.runtime.aggregate.AggregateUtil
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.typeutils.TypeConverter
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+ * Flink RelNode which matches along with a LogicalAggregate.
+ */
+class DataSetAggregate(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkAggregate
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetAggregate(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }}select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+ .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil))
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+ val child = this.getInput
+ val rowCnt = metadata.getRowCount(child)
+ val rowSize = this.estimateRowSize(child.getRowType)
+ val aggCnt = this.namedAggregates.size
+ planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val groupingKeys = grouping.indices.toArray
+
+ val mapFunction = AggregateUtil.createPrepareMapFunction(
+ namedAggregates,
+ grouping,
+ inputType)
+
+ val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping)
+
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
+ tableEnv,
+ // tell the input operator that this operator currently only supports Rows as input
+ Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+ // get the output types
+ val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
+ .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+ .toArray
+
+ val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
+ val prepareOpName = s"prepare select: ($aggString)"
+ val mappedInput = inputDS
+ .map(mapFunction)
+ .name(prepareOpName)
+
+ val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
+
+ val result = {
+ if (groupingKeys.length > 0) {
+ // grouped aggregation
+ val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+ s"select: ($aggString)"
+
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .groupBy(groupingKeys: _*)
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataSet[Any]]
+ }
+ else {
+ // global aggregation
+ val aggOpName = s"select:($aggString)"
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataSet[Any]]
+ }
+ }
+
+ // if the expected type is not a Row, inject a mapper to convert to the expected type
+ expectedType match {
+ case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+ val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+ result.map(getConversionMapper(
+ config = config,
+ nullableInput = false,
+ inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+ expectedType = expectedType.get,
+ conversionOperatorName = "DataSetAggregateConversion",
+ fieldNames = getRowType.getFieldNames.asScala
+ ))
+ .name(mapName)
+ case _ => result
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
new file mode 100644
index 0000000..03178ad
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.FlinkCalc
+import org.apache.flink.table.typeutils.TypeConverter
+import TypeConverter._
+import org.apache.calcite.rex._
+import org.apache.flink.table.api.BatchTableEnvironment
+
+import scala.collection.JavaConverters._
+
+/**
+ * Flink RelNode which matches along with LogicalCalc.
+ *
+ */
+class DataSetCalc(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ input: RelNode,
+ rowRelDataType: RelDataType,
+ private[flink] val calcProgram: RexProgram, // for tests
+ ruleDescription: String)
+ extends SingleRel(cluster, traitSet, input)
+ with FlinkCalc
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetCalc(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ getRowType,
+ calcProgram,
+ ruleDescription)
+ }
+
+ override def toString: String = calcToString(calcProgram, getExpressionString)
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .item("select", selectionToString(calcProgram, getExpressionString))
+ .itemIf("where",
+ conditionToString(calcProgram, getExpressionString),
+ calcProgram.getCondition != null)
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+ val child = this.getInput
+ val rowCnt = metadata.getRowCount(child)
+
+ // compute number of expressions that do not access a field or literal, i.e. computations,
+ // conditions, etc. We only want to account for computations, not for simple projections.
+ val compCnt = calcProgram.getExprList.asScala.toList.count {
+ case i: RexInputRef => false
+ case l: RexLiteral => false
+ case _ => true
+ }
+
+ planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
+ }
+
+ override def estimateRowCount(metadata: RelMetadataQuery): Double = {
+ val child = this.getInput
+ val rowCnt = metadata.getRowCount(child)
+
+ if (calcProgram.getCondition != null) {
+ // we reduce the result card to push filters down
+ (rowCnt * 0.75).min(1.0)
+ } else {
+ rowCnt
+ }
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ val generator = new CodeGenerator(config, false, inputDS.getType)
+
+ val body = functionBody(
+ generator,
+ inputDS.getType,
+ getRowType,
+ calcProgram,
+ config,
+ expectedType)
+
+ val genFunction = generator.generateFunction(
+ ruleDescription,
+ classOf[FlatMapFunction[Any, Any]],
+ body,
+ returnType)
+
+ val mapFunc = calcMapFunction(genFunction)
+ inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetConvention.scala
new file mode 100644
index 0000000..1b7bab5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetConvention.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+
+class DataSetConvention extends Convention {
+
+ override def toString: String = getName
+
+ override def useAbstractConvertersForConversion(
+ fromTraits: RelTraitSet,
+ toTraits: RelTraitSet): Boolean = false
+
+ override def canConvertConvention(toConvention: Convention): Boolean = false
+
+ def getInterface: Class[_] = classOf[DataSetRel]
+
+ def getName: String = "DATASET"
+
+ def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
+
+ def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
+
+ def register(planner: RelOptPlanner): Unit = { }
+}
+
+object DataSetConvention {
+
+ val INSTANCE = new DataSetConvention
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
new file mode 100644
index 0000000..fa1afc3
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.FlinkCorrelate
+import org.apache.flink.table.typeutils.TypeConverter._
+
+/**
+ * Flink RelNode which matches along with join a user defined table function.
+ */
+class DataSetCorrelate(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ scan: LogicalTableFunctionScan,
+ condition: Option[RexNode],
+ relRowType: RelDataType,
+ joinRowType: RelDataType,
+ joinType: SemiJoinType,
+ ruleDescription: String)
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkCorrelate
+ with DataSetRel {
+
+ override def deriveRowType() = relRowType
+
+ override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+ val rowCnt = metadata.getRowCount(getInput) * 1.5
+ planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 0.5)
+ }
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetCorrelate(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ scan,
+ condition,
+ relRowType,
+ joinRowType,
+ joinType,
+ ruleDescription)
+ }
+
+ override def toString: String = {
+ val rexCall = scan.getCall.asInstanceOf[RexCall]
+ val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+ correlateToString(rexCall, sqlFunction)
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ val rexCall = scan.getCall.asInstanceOf[RexCall]
+ val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+ super.explainTerms(pw)
+ .item("invocation", scan.getCall)
+ .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
+ .item("rowType", relRowType)
+ .item("joinType", joinType)
+ .itemIf("condition", condition.orNull, condition.isDefined)
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]])
+ : DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // we do not need to specify input type
+ val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+ val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
+ val rexCall = funcRel.getCall.asInstanceOf[RexCall]
+ val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+ val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+ val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
+
+ val generator = new CodeGenerator(
+ config,
+ false,
+ inputDS.getType,
+ Some(udtfTypeInfo),
+ None,
+ Some(pojoFieldMapping))
+
+ val body = functionBody(
+ generator,
+ udtfTypeInfo,
+ getRowType,
+ rexCall,
+ condition,
+ config,
+ joinType,
+ expectedType)
+
+ val genFunction = generator.generateFunction(
+ ruleDescription,
+ classOf[FlatMapFunction[Any, Any]],
+ body,
+ returnType)
+
+ val mapFunc = correlateMapFunction(genFunction)
+
+ inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
new file mode 100644
index 0000000..332aa8a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.runtime.IntersectCoGroupFunction
+import org.apache.flink.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+ * Flink RelNode which translates Intersect into CoGroup Operator.
+ *
+ */
+class DataSetIntersect(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType,
+ all: Boolean)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetIntersect(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ getRowType,
+ all
+ )
+ }
+
+ override def toString: String = {
+ s"Intersect(intersect: ($intersectSelectionToString))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw).item("intersect", intersectSelectionToString)
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+ val children = this.getInputs
+ children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
+ val rowCnt = metadata.getRowCount(child)
+ val rowSize = this.estimateRowSize(child.getRowType)
+ cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
+ }
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+ val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+ val coGroupOpName = s"intersect: ($intersectSelectionToString)"
+ val coGroupFunction = new IntersectCoGroupFunction[Any](all)
+
+ val intersectDs = coGroupedDs.where("*").equalTo("*")
+ .`with`(coGroupFunction).name(coGroupOpName)
+
+ val config = tableEnv.getConfig
+ val leftType = leftDataSet.getType
+
+ // here we only care about left type information, because we emit records from left DataSet
+ expectedType match {
+ case None if config.getEfficientTypeUsage =>
+ intersectDs
+
+ case _ =>
+ val determinedType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // conversion
+ if (determinedType != leftType) {
+ val mapFunc = getConversionMapper(
+ config,
+ false,
+ leftType,
+ determinedType,
+ "DataSetIntersectConversion",
+ getRowType.getFieldNames)
+
+ val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+ intersectDs.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ intersectDs
+ }
+ }
+ }
+
+ private def intersectSelectionToString: String = {
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
new file mode 100644
index 0000000..324e949
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.FlatJoinRunner
+import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Flink RelNode which matches along with JoinOperator and its related operations.
+ */
+class DataSetJoin(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType,
+ joinCondition: RexNode,
+ joinRowType: RelDataType,
+ joinInfo: JoinInfo,
+ keyPairs: List[IntPair],
+ joinType: JoinRelType,
+ joinHint: JoinHint,
+ ruleDescription: String)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetJoin(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ getRowType,
+ joinCondition,
+ joinRowType,
+ joinInfo,
+ keyPairs,
+ joinType,
+ joinHint,
+ ruleDescription)
+ }
+
+ override def toString: String = {
+ s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .item("where", joinConditionToString)
+ .item("join", joinSelectionToString)
+ .item("joinType", joinTypeToString)
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+ val leftRowCnt = metadata.getRowCount(getLeft)
+ val leftRowSize = estimateRowSize(getLeft.getRowType)
+
+ val rightRowCnt = metadata.getRowCount(getRight)
+ val rightRowSize = estimateRowSize(getRight.getRowType)
+
+ val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
+ val cpuCost = leftRowCnt + rightRowCnt
+ val rowCnt = leftRowCnt + rightRowCnt
+
+ planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // get the equality keys
+ val leftKeys = ArrayBuffer.empty[Int]
+ val rightKeys = ArrayBuffer.empty[Int]
+ if (keyPairs.isEmpty) {
+ // if no equality keys => not supported
+ throw TableException(
+ "Joins should have at least one equality condition.\n" +
+ s"\tLeft: ${left.toString},\n" +
+ s"\tRight: ${right.toString},\n" +
+ s"\tCondition: ($joinConditionToString)"
+ )
+ }
+ else {
+ // at least one equality expression
+ val leftFields = left.getRowType.getFieldList
+ val rightFields = right.getRowType.getFieldList
+
+ keyPairs.foreach(pair => {
+ val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
+ val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+
+ // check if keys are compatible
+ if (leftKeyType == rightKeyType) {
+ // add key pair
+ leftKeys.add(pair.source)
+ rightKeys.add(pair.target)
+ } else {
+ throw TableException(
+ "Equality join predicate on incompatible types.\n" +
+ s"\tLeft: ${left.toString},\n" +
+ s"\tRight: ${right.toString},\n" +
+ s"\tCondition: ($joinConditionToString)"
+ )
+ }
+ })
+ }
+
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+ val (joinOperator, nullCheck) = joinType match {
+ case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
+ case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
+ case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
+ case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
+ }
+
+ if (nullCheck && !config.getNullCheck) {
+ throw TableException("Null check in TableConfig must be enabled for outer joins.")
+ }
+
+ val generator = new CodeGenerator(
+ config,
+ nullCheck,
+ leftDataSet.getType,
+ Some(rightDataSet.getType))
+ val conversion = generator.generateConverterResultExpression(
+ returnType,
+ joinRowType.getFieldNames)
+
+ var body = ""
+
+ if (joinInfo.isEqui) {
+ // only equality condition
+ body = s"""
+ |${conversion.code}
+ |${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |""".stripMargin
+ }
+ else {
+ val condition = generator.generateExpression(joinCondition)
+ body = s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ | ${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
+ }
+ val genFunction = generator.generateFunction(
+ ruleDescription,
+ classOf[FlatJoinFunction[Any, Any, Any]],
+ body,
+ returnType)
+
+ val joinFun = new FlatJoinRunner[Any, Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
+
+ joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*)
+ .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
+ }
+
+ private def joinSelectionToString: String = {
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
+ }
+
+ private def joinConditionToString: String = {
+
+ val inFields = joinRowType.getFieldNames.asScala.toList
+ getExpressionString(joinCondition, inFields, None)
+ }
+
+ private def joinTypeToString = joinType match {
+ case JoinRelType.INNER => "InnerJoin"
+ case JoinRelType.LEFT=> "LeftOuterJoin"
+ case JoinRelType.RIGHT => "RightOuterJoin"
+ case JoinRelType.FULL => "FullOuterJoin"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
new file mode 100644
index 0000000..672ff9c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.runtime.MinusCoGroupFunction
+import org.apache.flink.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+ * Flink RelNode which implements set minus operation.
+ *
+ */
+class DataSetMinus(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType,
+ all: Boolean)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetMinus(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ getRowType,
+ all
+ )
+ }
+
+ override def toString: String = {
+ s"Minus(minus: ($minusSelectionToString}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw).item("minus", minusSelectionToString)
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+ val children = this.getInputs
+ children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
+ val rowCnt = metadata.getRowCount(child)
+ val rowSize = this.estimateRowSize(child.getRowType)
+ cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
+ }
+ }
+
+ override def estimateRowCount(mq: RelMetadataQuery): Double = {
+ // from org.apache.calcite.rel.metadata.RelMdUtil.getMinusRowCount
+ val children = this.getInputs
+ var rowCnt = mq.getRowCount(children.head)
+ getInputs.tail.foreach(rowCnt -= 0.5 * mq.getRowCount(_))
+ if (rowCnt < 0) {
+ rowCnt = 0.0
+ }
+ rowCnt
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+ val coGroupedDs = leftDataSet.coGroup(rightDataSet)
+
+ val coGroupOpName = s"minus: ($minusSelectionToString)"
+ val coGroupFunction = new MinusCoGroupFunction[Any](all)
+
+ val minusDs = coGroupedDs.where("*").equalTo("*")
+ .`with`(coGroupFunction).name(coGroupOpName)
+
+ val config = tableEnv.getConfig
+ val leftType = leftDataSet.getType
+
+ // here we only care about left type information, because we emit records from left DataSet
+ expectedType match {
+ case None if config.getEfficientTypeUsage =>
+ minusDs
+
+ case _ =>
+ val determinedType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // conversion
+ if (determinedType != leftType) {
+ val mapFunc = getConversionMapper(
+ config = config,
+ nullableInput = false,
+ inputType = leftType,
+ expectedType = determinedType,
+ conversionOperatorName = "DataSetMinusConversion",
+ fieldNames = getRowType.getFieldNames)
+
+ val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+ minusDs.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ minusDs
+ }
+ }
+ }
+
+ private def minusSelectionToString: String = {
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
new file mode 100644
index 0000000..210ae03
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.plan.nodes.FlinkRel
+
+import scala.collection.JavaConversions._
+
+trait DataSetRel extends RelNode with FlinkRel {
+
+ /**
+ * Translates the [[DataSetRel]] node into a [[DataSet]] operator.
+ *
+ * @param tableEnv [[BatchTableEnvironment]] of the translated Table.
+ * @param expectedType specifies the type the Flink operator should return. The type must
+ * have the same arity as the result. For instance, if the
+ * expected type is a RowTypeInfo this method will return a DataSet of
+ * type Row. If the expected type is Tuple2, the operator will return
+ * a Tuple2 if possible. Row otherwise.
+ * @return DataSet of type expectedType or RowTypeInfo
+ */
+ def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
+
+ private[flink] def estimateRowSize(rowType: RelDataType): Double = {
+
+ rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
+ t match {
+ case SqlTypeName.TINYINT => s + 1
+ case SqlTypeName.SMALLINT => s + 2
+ case SqlTypeName.INTEGER => s + 4
+ case SqlTypeName.BIGINT => s + 8
+ case SqlTypeName.BOOLEAN => s + 1
+ case SqlTypeName.FLOAT => s + 4
+ case SqlTypeName.DOUBLE => s + 8
+ case SqlTypeName.VARCHAR => s + 12
+ case SqlTypeName.CHAR => s + 1
+ case SqlTypeName.DECIMAL => s + 12
+ case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
+ case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
+ case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
+ case _ => throw TableException(s"Unsupported data type encountered: $t")
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
new file mode 100644
index 0000000..48bbb74
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.plan.schema.DataSetTable
+
+/**
+ * Flink RelNode which matches along with DataSource.
+ * It ensures that types without deterministic field order (e.g. POJOs) are not part of
+ * the plan translation.
+ */
+class DataSetScan(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ table: RelOptTable,
+ rowRelDataType: RelDataType)
+ extends BatchScan(cluster, traitSet, table) {
+
+ val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetScan(
+ cluster,
+ traitSet,
+ getTable,
+ getRowType
+ )
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+ val inputDataSet: DataSet[Any] = dataSetTable.dataSet
+
+ convertToExpectedType(inputDataSet, dataSetTable, expectedType, config)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
new file mode 100644
index 0000000..a70b4ab
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan._
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
+import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+ * Flink RelNode that executes a Join where one of inputs is a single row.
+ */
+class DataSetSingleRowJoin(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ leftIsSingle: Boolean,
+ rowRelDataType: RelDataType,
+ joinCondition: RexNode,
+ joinRowType: RelDataType,
+ ruleDescription: String)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetSingleRowJoin(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ leftIsSingle,
+ getRowType,
+ joinCondition,
+ joinRowType,
+ ruleDescription)
+ }
+
+ override def toString: String = {
+ s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .item("where", joinConditionToString)
+ .item("join", joinSelectionToString)
+ .item("joinType", joinTypeToString)
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+ val child = if (leftIsSingle) {
+ this.getRight
+ } else {
+ this.getLeft
+ }
+ val rowCnt = metadata.getRowCount(child)
+ val rowSize = this.estimateRowSize(child.getRowType)
+ planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val broadcastSetName = "joinSet"
+ val mapSideJoin = generateMapFunction(
+ tableEnv.getConfig,
+ leftDataSet.getType,
+ rightDataSet.getType,
+ leftIsSingle,
+ joinCondition,
+ broadcastSetName,
+ expectedType)
+
+ val (multiRowDataSet, singleRowDataSet) =
+ if (leftIsSingle) {
+ (rightDataSet, leftDataSet)
+ } else {
+ (leftDataSet, rightDataSet)
+ }
+
+ multiRowDataSet
+ .flatMap(mapSideJoin)
+ .withBroadcastSet(singleRowDataSet, broadcastSetName)
+ .name(getMapOperatorName)
+ .asInstanceOf[DataSet[Any]]
+ }
+
+ private def generateMapFunction(
+ config: TableConfig,
+ inputType1: TypeInformation[Any],
+ inputType2: TypeInformation[Any],
+ firstIsSingle: Boolean,
+ joinCondition: RexNode,
+ broadcastInputSetName: String,
+ expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
+
+ val codeGenerator = new CodeGenerator(
+ config,
+ false,
+ inputType1,
+ Some(inputType2))
+
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ val conversion = codeGenerator.generateConverterResultExpression(
+ returnType,
+ joinRowType.getFieldNames)
+
+ val condition = codeGenerator.generateExpression(joinCondition)
+
+ val joinMethodBody = s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
+
+ val genFunction = codeGenerator.generateFunction(
+ ruleDescription,
+ classOf[FlatJoinFunction[Any, Any, Any]],
+ joinMethodBody,
+ returnType)
+
+ if (firstIsSingle) {
+ new MapJoinRightRunner[Any, Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType,
+ broadcastInputSetName)
+ } else {
+ new MapJoinLeftRunner[Any, Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType,
+ broadcastInputSetName)
+ }
+ }
+
+ private def getMapOperatorName: String = {
+ s"where: ($joinConditionToString), join: ($joinSelectionToString)"
+ }
+
+ private def joinSelectionToString: String = {
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
+ }
+
+ private def joinConditionToString: String = {
+ val inFields = joinRowType.getFieldNames.asScala.toList
+ getExpressionString(joinCondition, inFields, None)
+ }
+
+ private def joinTypeToString: String = {
+ "NestedLoopJoin"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
new file mode 100644
index 0000000..428ea84
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import java.util
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex.{RexLiteral, RexNode}
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
+import org.apache.flink.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+class DataSetSort(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inp: RelNode,
+ collations: RelCollation,
+ rowRelDataType: RelDataType,
+ offset: RexNode,
+ fetch: RexNode)
+ extends SingleRel(cluster, traitSet, inp)
+ with DataSetRel {
+
+ private val limitStart: Long = if (offset != null) {
+ RexLiteral.intValue(offset)
+ } else {
+ 0L
+ }
+
+ private val limitEnd: Long = if (fetch != null) {
+ RexLiteral.intValue(fetch) + limitStart
+ } else {
+ Long.MaxValue
+ }
+
+ override def deriveRowType(): RelDataType = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
+ new DataSetSort(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ collations,
+ getRowType,
+ offset,
+ fetch
+ )
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]] = None)
+ : DataSet[Any] = {
+
+ if (fieldCollations.isEmpty) {
+ throw TableException("Limiting the result without sorting is not allowed " +
+ "as it could lead to arbitrary results.")
+ }
+
+ val config = tableEnv.getConfig
+
+ val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+
+ val currentParallelism = inputDs.getExecutionEnvironment.getParallelism
+ var partitionedDs = if (currentParallelism == 1) {
+ inputDs
+ } else {
+ inputDs.partitionByRange(fieldCollations.map(_._1): _*)
+ .withOrders(fieldCollations.map(_._2): _*)
+ }
+
+ fieldCollations.foreach { fieldCollation =>
+ partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
+ }
+
+ val limitedDs = if (offset == null && fetch == null) {
+ partitionedDs
+ } else {
+ val countFunction = new CountPartitionFunction[Any]
+
+ val partitionCountName = s"prepare offset/fetch"
+
+ val partitionCount = partitionedDs
+ .mapPartition(countFunction)
+ .name(partitionCountName)
+
+ val broadcastName = "countPartition"
+
+ val limitFunction = new LimitFilterFunction[Any](
+ limitStart,
+ limitEnd,
+ broadcastName)
+
+ val limitName = s"offset: $offsetToString, fetch: $fetchToString"
+
+ partitionedDs
+ .filter(limitFunction)
+ .name(limitName)
+ .withBroadcastSet(partitionCount, broadcastName)
+ }
+
+ val inputType = partitionedDs.getType
+ expectedType match {
+
+ case None if config.getEfficientTypeUsage =>
+ limitedDs
+
+ case _ =>
+ val determinedType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ // conversion
+ if (determinedType != inputType) {
+
+ val mapFunc = getConversionMapper(
+ config = config,
+ nullableInput = false,
+ inputType = partitionedDs.getType,
+ expectedType = determinedType,
+ conversionOperatorName = "DataSetSortConversion",
+ fieldNames = getRowType.getFieldNames.asScala
+ )
+
+ val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+
+ limitedDs.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ limitedDs
+ }
+ }
+ }
+
+ private def directionToOrder(direction: Direction) = {
+ direction match {
+ case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
+ case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
+ case _ => throw new IllegalArgumentException("Unsupported direction.")
+ }
+
+ }
+
+ private val fieldCollations = collations.getFieldCollations.asScala
+ .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
+
+ private val sortFieldsToString = fieldCollations
+ .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
+
+ private val offsetToString = s"$offset"
+
+ private val fetchToString = if (limitEnd == Long.MaxValue) {
+ "unlimited"
+ } else {
+ s"$limitEnd"
+ }
+
+ override def toString: String =
+ s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"
+
+ override def explainTerms(pw: RelWriter) : RelWriter = {
+ super.explainTerms(pw)
+ .item("orderBy", sortFieldsToString)
+ .item("offset", offsetToString)
+ .item("fetch", fetchToString)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
new file mode 100644
index 0000000..b0c95b5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+* Flink RelNode which matches along with UnionOperator.
+*
+*/
+class DataSetUnion(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ leftNode: RelNode,
+ rightNode: RelNode,
+ rowRelDataType: RelDataType)
+ extends BiRel(cluster, traitSet, leftNode, rightNode)
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetUnion(
+ cluster,
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ rowRelDataType
+ )
+ }
+
+ override def toString: String = {
+ s"Union(union: ($unionSelectionToString))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw).item("union", unionSelectionToString)
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+
+ val children = this.getInputs
+ val rowCnt = children.foldLeft(0D) { (rows, child) =>
+ rows + metadata.getRowCount(child)
+ }
+
+ planner.getCostFactory.makeCost(rowCnt, 0, 0)
+ }
+
+ override def estimateRowCount(mq: RelMetadataQuery): Double = {
+ // adopted from org.apache.calcite.rel.metadata.RelMdUtil.getUnionAllRowCount
+ getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ var leftDataSet: DataSet[Any] = null
+ var rightDataSet: DataSet[Any] = null
+
+ expectedType match {
+ case None =>
+ leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ rightDataSet =
+ right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
+ case _ =>
+ leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+ rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
+ }
+
+ leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+ }
+
+ private def unionSelectionToString: String = {
+ rowRelDataType.getFieldNames.asScala.toList.mkString(", ")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
new file mode 100644
index 0000000..e0282f2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Values
+import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.calcite.rex.RexLiteral
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.io.ValuesInputFormat
+import org.apache.flink.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConverters._
+
+/**
+ * DataSet RelNode for a LogicalValues.
+ *
+ */
+class DataSetValues(
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ rowRelDataType: RelDataType,
+ tuples: ImmutableList[ImmutableList[RexLiteral]],
+ ruleDescription: String)
+ extends Values(cluster, rowRelDataType, tuples, traitSet)
+ with DataSetRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataSetValues(
+ cluster,
+ traitSet,
+ getRowType,
+ getTuples,
+ ruleDescription
+ )
+ }
+
+ override def toString: String = {
+ s"Values(values: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw).item("values", valuesFieldsToString)
+ }
+
+ override def translateToPlan(
+ tableEnv: BatchTableEnvironment,
+ expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val returnType = determineReturnType(
+ getRowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ val generator = new CodeGenerator(config)
+
+ // generate code for every record
+ val generatedRecords = getTuples.asScala.map { r =>
+ generator.generateResultExpression(
+ returnType,
+ getRowType.getFieldNames.asScala,
+ r.asScala)
+ }
+
+ // generate input format
+ val generatedFunction = generator.generateValuesInputFormat(
+ ruleDescription,
+ generatedRecords.map(_.code),
+ returnType)
+
+ val inputFormat = new ValuesInputFormat[Any](
+ generatedFunction.name,
+ generatedFunction.code,
+ generatedFunction.returnType)
+
+ tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
+ }
+
+ private def valuesFieldsToString: String = {
+ getRowType.getFieldNames.asScala.toList.mkString(", ")
+ }
+
+}
+
+