You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:03 UTC
[34/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
deleted file mode 100644
index 7932e11..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes
-
-import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.MapRunner
-
-import scala.collection.JavaConversions._
-
-trait FlinkRel {
-
- private[flink] def getExpressionString(
- expr: RexNode,
- inFields: List[String],
- localExprsTable: Option[List[RexNode]]): String = {
-
- expr match {
- case i: RexInputRef =>
- inFields.get(i.getIndex)
-
- case l: RexLiteral =>
- l.toString
-
- case l: RexLocalRef if localExprsTable.isEmpty =>
- throw new IllegalArgumentException("Encountered RexLocalRef without " +
- "local expression table")
-
- case l: RexLocalRef =>
- val lExpr = localExprsTable.get(l.getIndex)
- getExpressionString(lExpr, inFields, localExprsTable)
-
- case c: RexCall =>
- val op = c.getOperator.toString
- val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable))
- s"$op(${ops.mkString(", ")})"
-
- case fa: RexFieldAccess =>
- val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable)
- val field = fa.getField.getName
- s"$referenceExpr.$field"
-
- case _ =>
- throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
- }
- }
-
- private[flink] def getConversionMapper(
- config: TableConfig,
- nullableInput: Boolean,
- inputType: TypeInformation[Any],
- expectedType: TypeInformation[Any],
- conversionOperatorName: String,
- fieldNames: Seq[String],
- inputPojoFieldMapping: Option[Array[Int]] = None)
- : MapFunction[Any, Any] = {
-
- val generator = new CodeGenerator(
- config,
- nullableInput,
- inputType,
- None,
- inputPojoFieldMapping)
- val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
-
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
- val genFunction = generator.generateFunction(
- conversionOperatorName,
- classOf[MapFunction[Any, Any]],
- body,
- expectedType)
-
- new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
deleted file mode 100644
index a6de237..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchScan.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.core.TableScan
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.plan.schema.FlinkTable
-import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-abstract class BatchScan(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- table: RelOptTable)
- extends TableScan(cluster, traitSet, table)
- with DataSetRel {
-
- override def toString: String = {
- s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
- val rowCnt = metadata.getRowCount(this)
- planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
- }
-
- protected def convertToExpectedType(
- input: DataSet[Any],
- flinkTable: FlinkTable[_],
- expectedType: Option[TypeInformation[Any]],
- config: TableConfig): DataSet[Any] = {
-
- val inputType = input.getType
-
- expectedType match {
-
- // special case:
- // if efficient type usage is enabled and no expected type is set
- // we can simply forward the DataSet to the next operator.
- // however, we cannot forward PojoTypes as their fields don't have an order
- case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
- input
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != inputType) {
-
- val mapFunc = getConversionMapper(
- config,
- nullableInput = false,
- inputType,
- determinedType,
- "DataSetSourceConversion",
- getRowType.getFieldNames,
- Some(flinkTable.fieldIndexes))
-
- val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- input.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- input
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
deleted file mode 100644
index e368219..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
-import org.apache.flink.api.table.plan.schema.TableSourceTable
-import org.apache.flink.api.table.sources.BatchTableSource
-
-/** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
-class BatchTableSourceScan(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- table: RelOptTable,
- val tableSource: BatchTableSource[_])
- extends BatchScan(cluster, traitSet, table) {
-
- override def deriveRowType() = {
- val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, tableSource.getFieldTypes)
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
- val rowCnt = metadata.getRowCount(this)
- planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
- }
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new BatchTableSourceScan(
- cluster,
- traitSet,
- getTable,
- tableSource
- )
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .item("fields", tableSource.getFieldsNames.mkString(", "))
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val config = tableEnv.getConfig
- val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
-
- convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
deleted file mode 100644
index 94513d9..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.plan.nodes.FlinkAggregate
-import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
-import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.api.table.typeutils.TypeConverter
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory}
-import org.apache.flink.types.Row
-
-import scala.collection.JavaConverters._
-
-/**
- * Flink RelNode which matches along with a LogicalAggregate.
- */
-class DataSetAggregate(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inputNode: RelNode,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- rowRelDataType: RelDataType,
- inputType: RelDataType,
- grouping: Array[Int])
- extends SingleRel(cluster, traitSet, inputNode)
- with FlinkAggregate
- with DataSetRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetAggregate(
- cluster,
- traitSet,
- inputs.get(0),
- namedAggregates,
- getRowType,
- inputType,
- grouping)
- }
-
- override def toString: String = {
- s"Aggregate(${ if (!grouping.isEmpty) {
- s"groupBy: (${groupingToString(inputType, grouping)}), "
- } else {
- ""
- }}select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)}))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
- .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil))
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
- val child = this.getInput
- val rowCnt = metadata.getRowCount(child)
- val rowSize = this.estimateRowSize(child.getRowType)
- val aggCnt = this.namedAggregates.size
- planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val config = tableEnv.getConfig
-
- val groupingKeys = grouping.indices.toArray
-
- val mapFunction = AggregateUtil.createPrepareMapFunction(
- namedAggregates,
- grouping,
- inputType)
-
- val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction(
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping)
-
- val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
- tableEnv,
- // tell the input operator that this operator currently only supports Rows as input
- Some(TypeConverter.DEFAULT_ROW_TYPE))
-
- // get the output types
- val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
- .toArray
-
- val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
- val prepareOpName = s"prepare select: ($aggString)"
- val mappedInput = inputDS
- .map(mapFunction)
- .name(prepareOpName)
-
- val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
-
- val result = {
- if (groupingKeys.length > 0) {
- // grouped aggregation
- val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
- s"select: ($aggString)"
-
- mappedInput.asInstanceOf[DataSet[Row]]
- .groupBy(groupingKeys: _*)
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
- .name(aggOpName)
- .asInstanceOf[DataSet[Any]]
- }
- else {
- // global aggregation
- val aggOpName = s"select:($aggString)"
- mappedInput.asInstanceOf[DataSet[Row]]
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
- .name(aggOpName)
- .asInstanceOf[DataSet[Any]]
- }
- }
-
- // if the expected type is not a Row, inject a mapper to convert to the expected type
- expectedType match {
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- result.map(getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType = expectedType.get,
- conversionOperatorName = "DataSetAggregateConversion",
- fieldNames = getRowType.getFieldNames.asScala
- ))
- .name(mapName)
- case _ => result
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
deleted file mode 100644
index c0881b7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCalc.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptPlanner, RelOptCost, RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.nodes.FlinkCalc
-import org.apache.flink.api.table.typeutils.TypeConverter
-import TypeConverter._
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.calcite.rex._
-
-import scala.collection.JavaConverters._
-
-/**
- * Flink RelNode which matches along with LogicalCalc.
- *
- */
-class DataSetCalc(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- input: RelNode,
- rowRelDataType: RelDataType,
- private[flink] val calcProgram: RexProgram, // for tests
- ruleDescription: String)
- extends SingleRel(cluster, traitSet, input)
- with FlinkCalc
- with DataSetRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetCalc(
- cluster,
- traitSet,
- inputs.get(0),
- getRowType,
- calcProgram,
- ruleDescription)
- }
-
- override def toString: String = calcToString(calcProgram, getExpressionString)
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .item("select", selectionToString(calcProgram, getExpressionString))
- .itemIf("where",
- conditionToString(calcProgram, getExpressionString),
- calcProgram.getCondition != null)
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
- val child = this.getInput
- val rowCnt = metadata.getRowCount(child)
-
- // compute number of expressions that do not access a field or literal, i.e. computations,
- // conditions, etc. We only want to account for computations, not for simple projections.
- val compCnt = calcProgram.getExprList.asScala.toList.count {
- case i: RexInputRef => false
- case l: RexLiteral => false
- case _ => true
- }
-
- planner.getCostFactory.makeCost(rowCnt, rowCnt * compCnt, 0)
- }
-
- override def estimateRowCount(metadata: RelMetadataQuery): Double = {
- val child = this.getInput
- val rowCnt = metadata.getRowCount(child)
-
- if (calcProgram.getCondition != null) {
- // we reduce the result card to push filters down
- (rowCnt * 0.75).min(1.0)
- } else {
- rowCnt
- }
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val config = tableEnv.getConfig
-
- val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val generator = new CodeGenerator(config, false, inputDS.getType)
-
- val body = functionBody(
- generator,
- inputDS.getType,
- getRowType,
- calcProgram,
- config,
- expectedType)
-
- val genFunction = generator.generateFunction(
- ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
- body,
- returnType)
-
- val mapFunc = calcMapFunction(genFunction)
- inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
deleted file mode 100644
index 03d9a51..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-
-class DataSetConvention extends Convention {
-
- override def toString: String = getName
-
- override def useAbstractConvertersForConversion(
- fromTraits: RelTraitSet,
- toTraits: RelTraitSet): Boolean = false
-
- override def canConvertConvention(toConvention: Convention): Boolean = false
-
- def getInterface: Class[_] = classOf[DataSetRel]
-
- def getName: String = "DATASET"
-
- def getTraitDef: RelTraitDef[_ <: RelTrait] = ConventionTraitDef.INSTANCE
-
- def satisfies(`trait`: RelTrait): Boolean = this eq `trait`
-
- def register(planner: RelOptPlanner): Unit = { }
-}
-
-object DataSetConvention {
-
- val INSTANCE = new DataSetConvention
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
deleted file mode 100644
index 95eb15b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.logical.LogicalTableFunctionScan
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexNode, RexCall}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.functions.utils.TableSqlFunction
-import org.apache.flink.api.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-/**
- * Flink RelNode which matches along with join a user defined table function.
- */
-class DataSetCorrelate(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inputNode: RelNode,
- scan: LogicalTableFunctionScan,
- condition: Option[RexNode],
- relRowType: RelDataType,
- joinRowType: RelDataType,
- joinType: SemiJoinType,
- ruleDescription: String)
- extends SingleRel(cluster, traitSet, inputNode)
- with FlinkCorrelate
- with DataSetRel {
-
- override def deriveRowType() = relRowType
-
- override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
- val rowCnt = metadata.getRowCount(getInput) * 1.5
- planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 0.5)
- }
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetCorrelate(
- cluster,
- traitSet,
- inputs.get(0),
- scan,
- condition,
- relRowType,
- joinRowType,
- joinType,
- ruleDescription)
- }
-
- override def toString: String = {
- val rexCall = scan.getCall.asInstanceOf[RexCall]
- val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- correlateToString(rexCall, sqlFunction)
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- val rexCall = scan.getCall.asInstanceOf[RexCall]
- val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- super.explainTerms(pw)
- .item("invocation", scan.getCall)
- .item("function", sqlFunction.getTableFunction.getClass.getCanonicalName)
- .item("rowType", relRowType)
- .item("joinType", joinType)
- .itemIf("condition", condition.orNull, condition.isDefined)
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataSet[Any] = {
-
- val config = tableEnv.getConfig
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // we do not need to specify input type
- val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
- val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
- val rexCall = funcRel.getCall.asInstanceOf[RexCall]
- val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- val pojoFieldMapping = sqlFunction.getPojoFieldMapping
- val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
-
- val generator = new CodeGenerator(
- config,
- false,
- inputDS.getType,
- Some(udtfTypeInfo),
- None,
- Some(pojoFieldMapping))
-
- val body = functionBody(
- generator,
- udtfTypeInfo,
- getRowType,
- rexCall,
- condition,
- config,
- joinType,
- expectedType)
-
- val genFunction = generator.generateFunction(
- ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
- body,
- returnType)
-
- val mapFunc = correlateMapFunction(genFunction)
-
- inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
deleted file mode 100644
index d2203d0..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.runtime.IntersectCoGroupFunction
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
- * Flink RelNode which translates Intersect into CoGroup Operator.
- *
- */
-class DataSetIntersect(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- leftNode: RelNode,
- rightNode: RelNode,
- rowRelDataType: RelDataType,
- all: Boolean)
- extends BiRel(cluster, traitSet, leftNode, rightNode)
- with DataSetRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetIntersect(
- cluster,
- traitSet,
- inputs.get(0),
- inputs.get(1),
- getRowType,
- all
- )
- }
-
- override def toString: String = {
- s"Intersect(intersect: ($intersectSelectionToString))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw).item("intersect", intersectSelectionToString)
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
- val children = this.getInputs
- children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
- val rowCnt = metadata.getRowCount(child)
- val rowSize = this.estimateRowSize(child.getRowType)
- cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
- }
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
- val coGroupedDs = leftDataSet.coGroup(rightDataSet)
-
- val coGroupOpName = s"intersect: ($intersectSelectionToString)"
- val coGroupFunction = new IntersectCoGroupFunction[Any](all)
-
- val intersectDs = coGroupedDs.where("*").equalTo("*")
- .`with`(coGroupFunction).name(coGroupOpName)
-
- val config = tableEnv.getConfig
- val leftType = leftDataSet.getType
-
- // here we only care about left type information, because we emit records from left DataSet
- expectedType match {
- case None if config.getEfficientTypeUsage =>
- intersectDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != leftType) {
- val mapFunc = getConversionMapper(
- config,
- false,
- leftType,
- determinedType,
- "DataSetIntersectConversion",
- getRowType.getFieldNames)
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- intersectDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- intersectDs
- }
- }
- }
-
- private def intersectSelectionToString: String = {
- getRowType.getFieldNames.asScala.toList.mkString(", ")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
deleted file mode 100644
index ccd84ca..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.calcite.util.mapping.IntPair
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.FlatJoinRunner
-import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rex.RexNode
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Flink RelNode which matches along with JoinOperator and its related operations.
- */
-class DataSetJoin(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- leftNode: RelNode,
- rightNode: RelNode,
- rowRelDataType: RelDataType,
- joinCondition: RexNode,
- joinRowType: RelDataType,
- joinInfo: JoinInfo,
- keyPairs: List[IntPair],
- joinType: JoinRelType,
- joinHint: JoinHint,
- ruleDescription: String)
- extends BiRel(cluster, traitSet, leftNode, rightNode)
- with DataSetRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetJoin(
- cluster,
- traitSet,
- inputs.get(0),
- inputs.get(1),
- getRowType,
- joinCondition,
- joinRowType,
- joinInfo,
- keyPairs,
- joinType,
- joinHint,
- ruleDescription)
- }
-
- override def toString: String = {
- s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .item("where", joinConditionToString)
- .item("join", joinSelectionToString)
- .item("joinType", joinTypeToString)
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
- val leftRowCnt = metadata.getRowCount(getLeft)
- val leftRowSize = estimateRowSize(getLeft.getRowType)
-
- val rightRowCnt = metadata.getRowCount(getRight)
- val rightRowSize = estimateRowSize(getRight.getRowType)
-
- val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
- val cpuCost = leftRowCnt + rightRowCnt
- val rowCnt = leftRowCnt + rightRowCnt
-
- planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val config = tableEnv.getConfig
-
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // get the equality keys
- val leftKeys = ArrayBuffer.empty[Int]
- val rightKeys = ArrayBuffer.empty[Int]
- if (keyPairs.isEmpty) {
- // if no equality keys => not supported
- throw TableException(
- "Joins should have at least one equality condition.\n" +
- s"\tLeft: ${left.toString},\n" +
- s"\tRight: ${right.toString},\n" +
- s"\tCondition: ($joinConditionToString)"
- )
- }
- else {
- // at least one equality expression
- val leftFields = left.getRowType.getFieldList
- val rightFields = right.getRowType.getFieldList
-
- keyPairs.foreach(pair => {
- val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
- val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
-
- // check if keys are compatible
- if (leftKeyType == rightKeyType) {
- // add key pair
- leftKeys.add(pair.source)
- rightKeys.add(pair.target)
- } else {
- throw TableException(
- "Equality join predicate on incompatible types.\n" +
- s"\tLeft: ${left.toString},\n" +
- s"\tRight: ${right.toString},\n" +
- s"\tCondition: ($joinConditionToString)"
- )
- }
- })
- }
-
- val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
- val (joinOperator, nullCheck) = joinType match {
- case JoinRelType.INNER => (leftDataSet.join(rightDataSet), false)
- case JoinRelType.LEFT => (leftDataSet.leftOuterJoin(rightDataSet), true)
- case JoinRelType.RIGHT => (leftDataSet.rightOuterJoin(rightDataSet), true)
- case JoinRelType.FULL => (leftDataSet.fullOuterJoin(rightDataSet), true)
- }
-
- if (nullCheck && !config.getNullCheck) {
- throw TableException("Null check in TableConfig must be enabled for outer joins.")
- }
-
- val generator = new CodeGenerator(
- config,
- nullCheck,
- leftDataSet.getType,
- Some(rightDataSet.getType))
- val conversion = generator.generateConverterResultExpression(
- returnType,
- joinRowType.getFieldNames)
-
- var body = ""
-
- if (joinInfo.isEqui) {
- // only equality condition
- body = s"""
- |${conversion.code}
- |${generator.collectorTerm}.collect(${conversion.resultTerm});
- |""".stripMargin
- }
- else {
- val condition = generator.generateExpression(joinCondition)
- body = s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- | ${generator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
- }
- val genFunction = generator.generateFunction(
- ruleDescription,
- classOf[FlatJoinFunction[Any, Any, Any]],
- body,
- returnType)
-
- val joinFun = new FlatJoinRunner[Any, Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
-
- joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*)
- .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
- }
-
- private def joinSelectionToString: String = {
- getRowType.getFieldNames.asScala.toList.mkString(", ")
- }
-
- private def joinConditionToString: String = {
-
- val inFields = joinRowType.getFieldNames.asScala.toList
- getExpressionString(joinCondition, inFields, None)
- }
-
- private def joinTypeToString = joinType match {
- case JoinRelType.INNER => "InnerJoin"
- case JoinRelType.LEFT=> "LeftOuterJoin"
- case JoinRelType.RIGHT => "RightOuterJoin"
- case JoinRelType.FULL => "FullOuterJoin"
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
deleted file mode 100644
index 6a5cbd1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.runtime.MinusCoGroupFunction
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
- * Flink RelNode which implements set minus operation.
- *
- */
-class DataSetMinus(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- leftNode: RelNode,
- rightNode: RelNode,
- rowRelDataType: RelDataType,
- all: Boolean)
- extends BiRel(cluster, traitSet, leftNode, rightNode)
- with DataSetRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetMinus(
- cluster,
- traitSet,
- inputs.get(0),
- inputs.get(1),
- getRowType,
- all
- )
- }
-
- override def toString: String = {
- s"Minus(minus: ($minusSelectionToString}))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw).item("minus", minusSelectionToString)
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
- val children = this.getInputs
- children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
- val rowCnt = metadata.getRowCount(child)
- val rowSize = this.estimateRowSize(child.getRowType)
- cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
- }
- }
-
- override def estimateRowCount(mq: RelMetadataQuery): Double = {
- // from org.apache.calcite.rel.metadata.RelMdUtil.getMinusRowCount
- val children = this.getInputs
- var rowCnt = mq.getRowCount(children.head)
- getInputs.tail.foreach(rowCnt -= 0.5 * mq.getRowCount(_))
- if (rowCnt < 0) {
- rowCnt = 0.0
- }
- rowCnt
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
- val coGroupedDs = leftDataSet.coGroup(rightDataSet)
-
- val coGroupOpName = s"minus: ($minusSelectionToString)"
- val coGroupFunction = new MinusCoGroupFunction[Any](all)
-
- val minusDs = coGroupedDs.where("*").equalTo("*")
- .`with`(coGroupFunction).name(coGroupOpName)
-
- val config = tableEnv.getConfig
- val leftType = leftDataSet.getType
-
- // here we only care about left type information, because we emit records from left DataSet
- expectedType match {
- case None if config.getEfficientTypeUsage =>
- minusDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != leftType) {
- val mapFunc = getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = leftType,
- expectedType = determinedType,
- conversionOperatorName = "DataSetMinusConversion",
- fieldNames = getRowType.getFieldNames)
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- minusDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- minusDs
- }
- }
- }
-
- private def minusSelectionToString: String = {
- getRowType.getFieldNames.asScala.toList.mkString(", ")
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
deleted file mode 100644
index 82c75e1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.plan.nodes.FlinkRel
-import org.apache.flink.api.table.runtime.MapRunner
-import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, TableException}
-
-import scala.collection.JavaConversions._
-
-trait DataSetRel extends RelNode with FlinkRel {
-
- /**
- * Translates the [[DataSetRel]] node into a [[DataSet]] operator.
- *
- * @param tableEnv [[org.apache.flink.api.table.BatchTableEnvironment]] of the translated Table.
- * @param expectedType specifies the type the Flink operator should return. The type must
- * have the same arity as the result. For instance, if the
- * expected type is a RowTypeInfo this method will return a DataSet of
- * type Row. If the expected type is Tuple2, the operator will return
- * a Tuple2 if possible. Row otherwise.
- * @return DataSet of type expectedType or RowTypeInfo
- */
- def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
-
- private[flink] def estimateRowSize(rowType: RelDataType): Double = {
-
- rowType.getFieldList.map(_.getType.getSqlTypeName).foldLeft(0) { (s, t) =>
- t match {
- case SqlTypeName.TINYINT => s + 1
- case SqlTypeName.SMALLINT => s + 2
- case SqlTypeName.INTEGER => s + 4
- case SqlTypeName.BIGINT => s + 8
- case SqlTypeName.BOOLEAN => s + 1
- case SqlTypeName.FLOAT => s + 4
- case SqlTypeName.DOUBLE => s + 8
- case SqlTypeName.VARCHAR => s + 12
- case SqlTypeName.CHAR => s + 1
- case SqlTypeName.DECIMAL => s + 12
- case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8
- case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4
- case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12
- case _ => throw TableException(s"Unsupported data type encountered: $t")
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
deleted file mode 100644
index b783136..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetScan.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.plan.schema.DataSetTable
-
-/**
- * Flink RelNode which matches along with DataSource.
- * It ensures that types without deterministic field order (e.g. POJOs) are not part of
- * the plan translation.
- */
-class DataSetScan(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- table: RelOptTable,
- rowRelDataType: RelDataType)
- extends BatchScan(cluster, traitSet, table) {
-
- val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetScan(
- cluster,
- traitSet,
- getTable,
- getRowType
- )
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val config = tableEnv.getConfig
- val inputDataSet: DataSet[Any] = dataSetTable.dataSet
-
- convertToExpectedType(inputDataSet, dataSetTable, expectedType, config)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
deleted file mode 100644
index 0306c00..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
-import org.apache.flink.api.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig}
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
- * Flink RelNode that executes a Join where one of inputs is a single row.
- */
-class DataSetSingleRowJoin(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- leftNode: RelNode,
- rightNode: RelNode,
- leftIsSingle: Boolean,
- rowRelDataType: RelDataType,
- joinCondition: RexNode,
- joinRowType: RelDataType,
- ruleDescription: String)
- extends BiRel(cluster, traitSet, leftNode, rightNode)
- with DataSetRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetSingleRowJoin(
- cluster,
- traitSet,
- inputs.get(0),
- inputs.get(1),
- leftIsSingle,
- getRowType,
- joinCondition,
- joinRowType,
- ruleDescription)
- }
-
- override def toString: String = {
- s"$joinTypeToString(where: ($joinConditionToString), join: ($joinSelectionToString))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw)
- .item("where", joinConditionToString)
- .item("join", joinSelectionToString)
- .item("joinType", joinTypeToString)
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
- val child = if (leftIsSingle) {
- this.getRight
- } else {
- this.getLeft
- }
- val rowCnt = metadata.getRowCount(child)
- val rowSize = this.estimateRowSize(child.getRowType)
- planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val broadcastSetName = "joinSet"
- val mapSideJoin = generateMapFunction(
- tableEnv.getConfig,
- leftDataSet.getType,
- rightDataSet.getType,
- leftIsSingle,
- joinCondition,
- broadcastSetName,
- expectedType)
-
- val (multiRowDataSet, singleRowDataSet) =
- if (leftIsSingle) {
- (rightDataSet, leftDataSet)
- } else {
- (leftDataSet, rightDataSet)
- }
-
- multiRowDataSet
- .flatMap(mapSideJoin)
- .withBroadcastSet(singleRowDataSet, broadcastSetName)
- .name(getMapOperatorName)
- .asInstanceOf[DataSet[Any]]
- }
-
- private def generateMapFunction(
- config: TableConfig,
- inputType1: TypeInformation[Any],
- inputType2: TypeInformation[Any],
- firstIsSingle: Boolean,
- joinCondition: RexNode,
- broadcastInputSetName: String,
- expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
-
- val codeGenerator = new CodeGenerator(
- config,
- false,
- inputType1,
- Some(inputType2))
-
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val conversion = codeGenerator.generateConverterResultExpression(
- returnType,
- joinRowType.getFieldNames)
-
- val condition = codeGenerator.generateExpression(joinCondition)
-
- val joinMethodBody = s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
-
- val genFunction = codeGenerator.generateFunction(
- ruleDescription,
- classOf[FlatJoinFunction[Any, Any, Any]],
- joinMethodBody,
- returnType)
-
- if (firstIsSingle) {
- new MapJoinRightRunner[Any, Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType,
- broadcastInputSetName)
- } else {
- new MapJoinLeftRunner[Any, Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType,
- broadcastInputSetName)
- }
- }
-
- private def getMapOperatorName: String = {
- s"where: ($joinConditionToString), join: ($joinSelectionToString)"
- }
-
- private def joinSelectionToString: String = {
- getRowType.getFieldNames.asScala.toList.mkString(", ")
- }
-
- private def joinConditionToString: String = {
- val inFields = joinRowType.getFieldNames.asScala.toList
- getExpressionString(joinCondition, inFields, None)
- }
-
- private def joinTypeToString: String = {
- "NestedLoopJoin"
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
deleted file mode 100644
index 661aeef..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import java.util
-
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelFieldCollation.Direction
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexLiteral, RexNode}
-import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.{BatchTableEnvironment, TableException}
-import org.apache.flink.api.table.runtime.{CountPartitionFunction, LimitFilterFunction}
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConverters._
-
-class DataSetSort(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inp: RelNode,
- collations: RelCollation,
- rowRelDataType: RelDataType,
- offset: RexNode,
- fetch: RexNode)
- extends SingleRel(cluster, traitSet, inp)
- with DataSetRel {
-
- private val limitStart: Long = if (offset != null) {
- RexLiteral.intValue(offset)
- } else {
- 0L
- }
-
- private val limitEnd: Long = if (fetch != null) {
- RexLiteral.intValue(fetch) + limitStart
- } else {
- Long.MaxValue
- }
-
- override def deriveRowType(): RelDataType = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
- new DataSetSort(
- cluster,
- traitSet,
- inputs.get(0),
- collations,
- getRowType,
- offset,
- fetch
- )
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None)
- : DataSet[Any] = {
-
- if (fieldCollations.isEmpty) {
- throw TableException("Limiting the result without sorting is not allowed " +
- "as it could lead to arbitrary results.")
- }
-
- val config = tableEnv.getConfig
-
- val inputDs = inp.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
-
- val currentParallelism = inputDs.getExecutionEnvironment.getParallelism
- var partitionedDs = if (currentParallelism == 1) {
- inputDs
- } else {
- inputDs.partitionByRange(fieldCollations.map(_._1): _*)
- .withOrders(fieldCollations.map(_._2): _*)
- }
-
- fieldCollations.foreach { fieldCollation =>
- partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
- }
-
- val limitedDs = if (offset == null && fetch == null) {
- partitionedDs
- } else {
- val countFunction = new CountPartitionFunction[Any]
-
- val partitionCountName = s"prepare offset/fetch"
-
- val partitionCount = partitionedDs
- .mapPartition(countFunction)
- .name(partitionCountName)
-
- val broadcastName = "countPartition"
-
- val limitFunction = new LimitFilterFunction[Any](
- limitStart,
- limitEnd,
- broadcastName)
-
- val limitName = s"offset: $offsetToString, fetch: $fetchToString"
-
- partitionedDs
- .filter(limitFunction)
- .name(limitName)
- .withBroadcastSet(partitionCount, broadcastName)
- }
-
- val inputType = partitionedDs.getType
- expectedType match {
-
- case None if config.getEfficientTypeUsage =>
- limitedDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != inputType) {
-
- val mapFunc = getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = partitionedDs.getType,
- expectedType = determinedType,
- conversionOperatorName = "DataSetSortConversion",
- fieldNames = getRowType.getFieldNames.asScala
- )
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- limitedDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- limitedDs
- }
- }
- }
-
- private def directionToOrder(direction: Direction) = {
- direction match {
- case Direction.ASCENDING | Direction.STRICTLY_ASCENDING => Order.ASCENDING
- case Direction.DESCENDING | Direction.STRICTLY_DESCENDING => Order.DESCENDING
- case _ => throw new IllegalArgumentException("Unsupported direction.")
- }
-
- }
-
- private val fieldCollations = collations.getFieldCollations.asScala
- .map(c => (c.getFieldIndex, directionToOrder(c.getDirection)))
-
- private val sortFieldsToString = fieldCollations
- .map(col => s"${getRowType.getFieldNames.get(col._1)} ${col._2.getShortName}" ).mkString(", ")
-
- private val offsetToString = s"$offset"
-
- private val fetchToString = if (limitEnd == Long.MaxValue) {
- "unlimited"
- } else {
- s"$limitEnd"
- }
-
- override def toString: String =
- s"Sort(by: ($sortFieldsToString), offset: $offsetToString, fetch: $fetchToString)"
-
- override def explainTerms(pw: RelWriter) : RelWriter = {
- super.explainTerms(pw)
- .item("orderBy", sortFieldsToString)
- .item("offset", offsetToString)
- .item("fetch", fetchToString)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
deleted file mode 100644
index 6e43fae..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-/**
-* Flink RelNode which matches along with UnionOperator.
-*
-*/
-class DataSetUnion(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- leftNode: RelNode,
- rightNode: RelNode,
- rowRelDataType: RelDataType)
- extends BiRel(cluster, traitSet, leftNode, rightNode)
- with DataSetRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetUnion(
- cluster,
- traitSet,
- inputs.get(0),
- inputs.get(1),
- rowRelDataType
- )
- }
-
- override def toString: String = {
- s"Union(union: ($unionSelectionToString))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw).item("union", unionSelectionToString)
- }
-
- override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
-
- val children = this.getInputs
- val rowCnt = children.foldLeft(0D) { (rows, child) =>
- rows + metadata.getRowCount(child)
- }
-
- planner.getCostFactory.makeCost(rowCnt, 0, 0)
- }
-
- override def estimateRowCount(mq: RelMetadataQuery): Double = {
- // adopted from org.apache.calcite.rel.metadata.RelMdUtil.getUnionAllRowCount
- getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- var leftDataSet: DataSet[Any] = null
- var rightDataSet: DataSet[Any] = null
-
- expectedType match {
- case None =>
- leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- rightDataSet =
- right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
- case _ =>
- leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
- rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
- }
-
- leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
- }
-
- private def unionSelectionToString: String = {
- rowRelDataType.getFieldNames.asScala.toList.mkString(", ")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
deleted file mode 100644
index 4f3a257..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes.dataset
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Values
-import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.BatchTableEnvironment
-import org.apache.flink.api.table.codegen.CodeGenerator
-import org.apache.flink.api.table.runtime.io.ValuesInputFormat
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConverters._
-
-/**
- * DataSet RelNode for a LogicalValues.
- *
- */
-class DataSetValues(
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- rowRelDataType: RelDataType,
- tuples: ImmutableList[ImmutableList[RexLiteral]],
- ruleDescription: String)
- extends Values(cluster, rowRelDataType, tuples, traitSet)
- with DataSetRel {
-
- override def deriveRowType() = rowRelDataType
-
- override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
- new DataSetValues(
- cluster,
- traitSet,
- getRowType,
- getTuples,
- ruleDescription
- )
- }
-
- override def toString: String = {
- s"Values(values: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
- }
-
- override def explainTerms(pw: RelWriter): RelWriter = {
- super.explainTerms(pw).item("values", valuesFieldsToString)
- }
-
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- val config = tableEnv.getConfig
-
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val generator = new CodeGenerator(config)
-
- // generate code for every record
- val generatedRecords = getTuples.asScala.map { r =>
- generator.generateResultExpression(
- returnType,
- getRowType.getFieldNames.asScala,
- r.asScala)
- }
-
- // generate input format
- val generatedFunction = generator.generateValuesInputFormat(
- ruleDescription,
- generatedRecords.map(_.code),
- returnType)
-
- val inputFormat = new ValuesInputFormat[Any](
- generatedFunction.name,
- generatedFunction.code,
- generatedFunction.returnType)
-
- tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
- }
-
- private def valuesFieldsToString: String = {
- getRowType.getFieldNames.asScala.toList.mkString(", ")
- }
-
-}
-
-