You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/02/13 16:51:23 UTC
[2/3] flink git commit: [FLINK-5662] [table] Rework internal type
handling of Table API
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 03178ad..245a038 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -22,15 +22,15 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex._
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.nodes.FlinkCalc
-import org.apache.flink.table.typeutils.TypeConverter
-import TypeConverter._
-import org.apache.calcite.rex._
import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -46,7 +46,7 @@ class DataSetCalc(
private[flink] val calcProgram: RexProgram, // for tests
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
- with FlinkCalc
+ with CommonCalc
with DataSetRel {
override def deriveRowType() = rowRelDataType
@@ -99,19 +99,13 @@ class DataSetCalc(
}
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val generator = new CodeGenerator(config, false, inputDS.getType)
@@ -120,12 +114,11 @@ class DataSetCalc(
inputDS.getType,
getRowType,
calcProgram,
- config,
- expectedType)
+ config)
val genFunction = generator.generateFunction(
ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
+ classOf[FlatMapFunction[Row, Row]],
body,
returnType)
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 5a75e5d..c18a829 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with join a user defined table function.
@@ -45,7 +45,7 @@ class DataSetCorrelate(
joinType: SemiJoinType,
ruleDescription: String)
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkCorrelate
+ with CommonCorrelate
with DataSetRel {
override def deriveRowType() = relRowType
@@ -85,10 +85,7 @@ class DataSetCorrelate(
.itemIf("condition", condition.orNull, condition.isDefined)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
@@ -109,7 +106,6 @@ class DataSetCorrelate(
joinType,
rexCall,
condition,
- expectedType,
Some(pojoFieldMapping),
ruleDescription)
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
index 332aa8a..4497df3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.runtime.IntersectCoGroupFunction
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -75,55 +74,21 @@ class DataSetIntersect(
}
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
- val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val coGroupedDs = leftDataSet.coGroup(rightDataSet)
val coGroupOpName = s"intersect: ($intersectSelectionToString)"
- val coGroupFunction = new IntersectCoGroupFunction[Any](all)
-
- val intersectDs = coGroupedDs.where("*").equalTo("*")
- .`with`(coGroupFunction).name(coGroupOpName)
-
- val config = tableEnv.getConfig
- val leftType = leftDataSet.getType
-
- // here we only care about left type information, because we emit records from left DataSet
- expectedType match {
- case None if config.getEfficientTypeUsage =>
- intersectDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != leftType) {
- val mapFunc = getConversionMapper(
- config,
- false,
- leftType,
- determinedType,
- "DataSetIntersectConversion",
- getRowType.getFieldNames)
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- intersectDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- intersectDs
- }
- }
+ val coGroupFunction = new IntersectCoGroupFunction[Row](all)
+
+ coGroupedDs
+ .where("*")
+ .equalTo("*")
+ .`with`(coGroupFunction)
+ .name(coGroupOpName)
}
private def intersectSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index edb5be2..e6f8ca4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -23,17 +23,16 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.FlatJoinRunner
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -102,17 +101,11 @@ class DataSetJoin(
planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
// get the equality keys
val leftKeys = ArrayBuffer.empty[Int]
@@ -195,19 +188,22 @@ class DataSetJoin(
}
val genFunction = generator.generateFunction(
ruleDescription,
- classOf[FlatJoinFunction[Any, Any, Any]],
+ classOf[FlatJoinFunction[Row, Row, Row]],
body,
returnType)
- val joinFun = new FlatJoinRunner[Any, Any, Any](
+ val joinFun = new FlatJoinRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType)
val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
- joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*)
- .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
+ joinOperator
+ .where(leftKeys.toArray: _*)
+ .equalTo(rightKeys.toArray: _*)
+ .`with`(joinFun)
+ .name(joinOpName)
}
private def joinSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
index 672ff9c..9ba65bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
@@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.runtime.MinusCoGroupFunction
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -86,55 +85,21 @@ class DataSetMinus(
rowCnt
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
- val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val coGroupedDs = leftDataSet.coGroup(rightDataSet)
val coGroupOpName = s"minus: ($minusSelectionToString)"
- val coGroupFunction = new MinusCoGroupFunction[Any](all)
-
- val minusDs = coGroupedDs.where("*").equalTo("*")
- .`with`(coGroupFunction).name(coGroupOpName)
-
- val config = tableEnv.getConfig
- val leftType = leftDataSet.getType
-
- // here we only care about left type information, because we emit records from left DataSet
- expectedType match {
- case None if config.getEfficientTypeUsage =>
- minusDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != leftType) {
- val mapFunc = getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = leftType,
- expectedType = determinedType,
- conversionOperatorName = "DataSetMinusConversion",
- fieldNames = getRowType.getFieldNames)
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- minusDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- minusDs
- }
- }
+ val coGroupFunction = new MinusCoGroupFunction[Row](all)
+
+ coGroupedDs
+ .where("*")
+ .equalTo("*")
+ .`with`(coGroupFunction)
+ .name(coGroupOpName)
}
private def minusSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
index 02138cf..980f3cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,26 +19,19 @@
package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.plan.nodes.FlinkRel
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.types.Row
trait DataSetRel extends RelNode with FlinkRel {
/**
* Translates the [[DataSetRel]] node into a [[DataSet]] operator.
*
- * @param tableEnv [[BatchTableEnvironment]] of the translated Table.
- * @param expectedType specifies the type the Flink operator should return. The type must
- * have the same arity as the result. For instance, if the
- * expected type is a RowTypeInfo this method will return a DataSet of
- * type Row. If the expected type is Tuple2, the operator will return
- * a Tuple2 if possible. Row otherwise.
- * @return DataSet of type expectedType or RowTypeInfo
+ * @param tableEnv The [[BatchTableEnvironment]] of the translated Table.
+ * @return DataSet of type [[Row]]
*/
- def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
+ def translateToPlan(tableEnv: BatchTableEnvironment) : DataSet[Row]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
index 48bbb74..44d2d00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.plan.schema.DataSetTable
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with DataSource.
@@ -51,14 +51,12 @@ class DataSetScan(
)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
val inputDataSet: DataSet[Any] = dataSetTable.dataSet
- convertToExpectedType(inputDataSet, dataSetTable, expectedType, config)
+ convertToInternalRow(inputDataSet, dataSetTable, config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
index a70b4ab..b7d1a4b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -26,10 +26,11 @@ import org.apache.calcite.rex.RexNode
import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -87,9 +88,7 @@ class DataSetSingleRowJoin(
planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
@@ -100,8 +99,7 @@ class DataSetSingleRowJoin(
rightDataSet.getType,
leftIsSingle,
joinCondition,
- broadcastSetName,
- expectedType)
+ broadcastSetName)
val (multiRowDataSet, singleRowDataSet) =
if (leftIsSingle) {
@@ -114,17 +112,16 @@ class DataSetSingleRowJoin(
.flatMap(mapSideJoin)
.withBroadcastSet(singleRowDataSet, broadcastSetName)
.name(getMapOperatorName)
- .asInstanceOf[DataSet[Any]]
}
private def generateMapFunction(
config: TableConfig,
- inputType1: TypeInformation[Any],
- inputType2: TypeInformation[Any],
+ inputType1: TypeInformation[Row],
+ inputType2: TypeInformation[Row],
firstIsSingle: Boolean,
joinCondition: RexNode,
- broadcastInputSetName: String,
- expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
+ broadcastInputSetName: String)
+ : FlatMapFunction[Row, Row] = {
val codeGenerator = new CodeGenerator(
config,
@@ -132,11 +129,7 @@ class DataSetSingleRowJoin(
inputType1,
Some(inputType2))
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val conversion = codeGenerator.generateConverterResultExpression(
returnType,
@@ -144,28 +137,29 @@ class DataSetSingleRowJoin(
val condition = codeGenerator.generateExpression(joinCondition)
- val joinMethodBody = s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
+ val joinMethodBody =
+ s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
val genFunction = codeGenerator.generateFunction(
ruleDescription,
- classOf[FlatJoinFunction[Any, Any, Any]],
+ classOf[FlatJoinFunction[Row, Row, Row]],
joinMethodBody,
returnType)
if (firstIsSingle) {
- new MapJoinRightRunner[Any, Any, Any](
+ new MapJoinRightRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType,
broadcastInputSetName)
} else {
- new MapJoinLeftRunner[Any, Any, Any](
+ new MapJoinLeftRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType,
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
index 4d84730..192237a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -27,11 +27,10 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -87,10 +86,7 @@ class DataSetSort(
}
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None)
- : DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
if (fieldCollations.isEmpty) {
throw TableException("Limiting the result without sorting is not allowed " +
@@ -113,10 +109,10 @@ class DataSetSort(
partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
}
- val limitedDs = if (offset == null && fetch == null) {
+ if (offset == null && fetch == null) {
partitionedDs
} else {
- val countFunction = new CountPartitionFunction[Any]
+ val countFunction = new CountPartitionFunction[Row]
val partitionCountName = s"prepare offset/fetch"
@@ -126,7 +122,7 @@ class DataSetSort(
val broadcastName = "countPartition"
- val limitFunction = new LimitFilterFunction[Any](
+ val limitFunction = new LimitFilterFunction[Row](
limitStart,
limitEnd,
broadcastName)
@@ -138,41 +134,6 @@ class DataSetSort(
.name(limitName)
.withBroadcastSet(partitionCount, broadcastName)
}
-
- val inputType = partitionedDs.getType
- expectedType match {
-
- case None if config.getEfficientTypeUsage =>
- limitedDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != inputType) {
-
- val mapFunc = getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = partitionedDs.getType,
- expectedType = determinedType,
- conversionOperatorName = "DataSetSortConversion",
- fieldNames = getRowType.getFieldNames.asScala
- )
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- limitedDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- limitedDs
- }
- }
}
private def directionToOrder(direction: Direction) = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
index b0c95b5..a87c6e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
@@ -22,9 +22,9 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -77,24 +77,12 @@ class DataSetUnion(
getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- var leftDataSet: DataSet[Any] = null
- var rightDataSet: DataSet[Any] = null
-
- expectedType match {
- case None =>
- leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- rightDataSet =
- right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
- case _ =>
- leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
- rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
- }
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+ leftDataSet.union(rightDataSet)
}
private def unionSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index e0282f2..3ebee2c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -66,17 +66,11 @@ class DataSetValues(
super.explainTerms(pw).item("values", valuesFieldsToString)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val generator = new CodeGenerator(config)
@@ -94,12 +88,12 @@ class DataSetValues(
generatedRecords.map(_.code),
returnType)
- val inputFormat = new ValuesInputFormat[Any](
+ val inputFormat = new ValuesInputFormat[Row](
generatedFunction.name,
generatedFunction.code,
generatedFunction.returnType)
- tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
+ tableEnv.execEnv.createInput(inputFormat, returnType)
}
private def valuesFieldsToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index b165afa..48de822 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -23,21 +23,17 @@ import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.TypeConverter
import org.apache.flink.types.Row
-import scala.collection.JavaConversions._
-
/**
* Flink RelNode which matches along with a LogicalWindowAggregate.
*/
@@ -52,7 +48,7 @@ class DataSetWindowAggregate(
inputType: RelDataType,
grouping: Array[Int])
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkAggregate
+ with CommonAggregate
with DataSetRel {
override def deriveRowType() = rowRelDataType
@@ -109,20 +105,15 @@ class DataSetWindowAggregate(
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
- val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
- tableEnv,
- // tell the input operator that this operator currently only supports Rows as input
- Some(TypeConverter.DEFAULT_ROW_TYPE))
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
// whether identifiers are matched case-sensitively
val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
- val result = window match {
+ window match {
case EventTimeTumblingGroupWindow(_, _, size) =>
createEventTimeTumblingWindowDataSet(
inputDS,
@@ -139,31 +130,14 @@ class DataSetWindowAggregate(
"windows in a batch environment must declare a time attribute over which " +
"the query is evaluated.")
}
-
- // if the expected type is not a Row, inject a mapper to convert to the expected type
- expectedType match {
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${getRowType.getFieldNames.toList.mkString(", ")})"
- result.map(
- getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = resultRowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType = expectedType.get,
- conversionOperatorName = "DataSetWindowAggregateConversion",
- fieldNames = getRowType.getFieldNames
- ))
- .name(mapName)
- case _ => result
- }
}
private def createEventTimeTumblingWindowDataSet(
- inputDS: DataSet[Any],
+ inputDS: DataSet[Row],
isTimeWindow: Boolean,
isParserCaseSensitive: Boolean)
- : DataSet[Any] = {
+ : DataSet[Row] = {
val mapFunction = createDataSetWindowPrepareMapFunction(
window,
namedAggregates,
@@ -182,6 +156,8 @@ class DataSetWindowAggregate(
.map(mapFunction)
.name(prepareOperatorName)
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
if (isTimeWindow) {
// grouped time window aggregation
@@ -190,9 +166,8 @@ class DataSetWindowAggregate(
mappedInput.asInstanceOf[DataSet[Row]]
.groupBy(groupingKeys: _*)
.reduceGroup(groupReduceFunction)
- .returns(resultRowTypeInfo)
+ .returns(rowTypeInfo)
.name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
} else {
// count window
val groupingKeys = grouping.indices.toArray
@@ -203,10 +178,8 @@ class DataSetWindowAggregate(
// sort on time field, it's the last element in the row
.sortGroup(mapReturnType.getArity - 1, Order.ASCENDING)
.reduceGroup(groupReduceFunction)
- .returns(resultRowTypeInfo)
+ .returns(rowTypeInfo)
.name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
-
} else {
// TODO: count tumbling all window on event-time should sort all the data set
// on event time before applying the windowing logic.
@@ -217,11 +190,12 @@ class DataSetWindowAggregate(
}
private[this] def createEventTimeSessionWindowDataSet(
- inputDS: DataSet[Any],
- isParserCaseSensitive: Boolean): DataSet[Any] = {
+ inputDS: DataSet[Row],
+ isParserCaseSensitive: Boolean)
+ : DataSet[Row] = {
val groupingKeys = grouping.indices.toArray
- val rowTypeInfo = resultRowTypeInfo
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
// grouping window
if (groupingKeys.length > 0) {
@@ -280,7 +254,6 @@ class DataSetWindowAggregate(
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
}
// do non-incremental aggregation
else {
@@ -298,7 +271,6 @@ class DataSetWindowAggregate(
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
}
}
// non-grouping window
@@ -332,12 +304,4 @@ class DataSetWindowAggregate(
s"window: ($window), select: ($aggString)"
}
}
-
- private def resultRowTypeInfo: RowTypeInfo = {
- // get the output types
- val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
- .toArray
- new RowTypeInfo(fieldTypes: _*)
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
index 6a3d4e3..c21d008 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -22,27 +22,23 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
import org.apache.flink.table.runtime.aggregate.AggregateUtil._
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
-
-import scala.collection.JavaConverters._
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.types.Row
class DataStreamAggregate(
window: LogicalWindow,
@@ -55,7 +51,7 @@ class DataStreamAggregate(
inputType: RelDataType,
grouping: Array[Int])
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkAggregate
+ with CommonAggregate
with DataStreamRel {
override def deriveRowType(): RelDataType = rowRelDataType
@@ -103,24 +99,12 @@ class DataStreamAggregate(
namedProperties))
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
- val config = tableEnv.getConfig
val groupingKeys = grouping.indices.toArray
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
- tableEnv,
- // tell the input operator that this operator currently only supports Rows as input
- Some(TypeConverter.DEFAULT_ROW_TYPE))
-
- // get the output types
- val fieldTypes: Array[TypeInformation[_]] =
- getRowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
- .toArray
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
- val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val aggString = aggregationToString(
inputType,
@@ -142,121 +126,100 @@ class DataStreamAggregate(
val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
- val result: DataStream[Any] = {
- // check whether all aggregates support partial aggregate
- if (AggregateUtil.doAllSupportPartialAggregation(
- namedAggregates.map(_.getKey),
- inputType,
- grouping.length)) {
- // do Incremental Aggregation
- val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+
+ // check whether all aggregates support partial aggregate
+ if (AggregateUtil.doAllSupportPartialAggregation(
+ namedAggregates.map(_.getKey),
+ inputType,
+ grouping.length)) {
+ // do Incremental Aggregation
+ val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+ namedAggregates,
+ inputType,
+ getRowType,
+ grouping)
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+ val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
+ window,
namedAggregates,
inputType,
- getRowType,
- grouping)
- // grouped / keyed aggregation
- if (groupingKeys.length > 0) {
- val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val keyedStream = mappedInput.keyBy(groupingKeys: _*)
- val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
- windowedStream
- .apply(reduceFunction, windowFunction)
- .returns(rowTypeInfo)
- .name(keyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
- // global / non-keyed aggregation
- else {
- val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val windowedStream =
- createNonKeyedWindowedStream(window, mappedInput)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
- windowedStream
- .apply(reduceFunction, windowFunction)
- .returns(rowTypeInfo)
- .name(nonKeyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+ val windowedStream =
+ createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+ windowedStream
+ .reduce(reduceFunction, windowFunction)
+ .returns(rowTypeInfo)
+ .name(keyedAggOpName)
}
+ // global / non-keyed aggregation
else {
- // do non-Incremental Aggregation
- // grouped / keyed aggregation
- if (groupingKeys.length > 0) {
-
- val windowFunction = AggregateUtil.createWindowAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val keyedStream = mappedInput.keyBy(groupingKeys: _*)
- val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
- windowedStream
- .apply(windowFunction)
- .returns(rowTypeInfo)
- .name(keyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
- // global / non-keyed aggregation
- else {
- val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val windowedStream =
- createNonKeyedWindowedStream(window, mappedInput)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
- windowedStream
- .apply(windowFunction)
- .returns(rowTypeInfo)
- .name(nonKeyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
+ val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val windowedStream =
+ createNonKeyedWindowedStream(window, mappedInput)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ windowedStream
+ .reduce(reduceFunction, windowFunction)
+ .returns(rowTypeInfo)
+ .name(nonKeyedAggOpName)
}
}
+ else {
+ // do non-Incremental Aggregation
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+
+ val windowFunction = AggregateUtil.createWindowAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+ val windowedStream =
+ createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
- // if the expected type is not a Row, inject a mapper to convert to the expected type
- expectedType match {
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- result.map(getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType = expectedType.get,
- conversionOperatorName = "DataStreamAggregateConversion",
- fieldNames = getRowType.getFieldNames.asScala
- ))
- .name(mapName)
- case _ => result
+ windowedStream
+ .apply(windowFunction)
+ .returns(rowTypeInfo)
+ .name(keyedAggOpName)
+ }
+ // global / non-keyed aggregation
+ else {
+ val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val windowedStream =
+ createNonKeyedWindowedStream(window, mappedInput)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ windowedStream
+ .apply(windowFunction)
+ .returns(rowTypeInfo)
+ .name(nonKeyedAggOpName)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 43f1fb6..b39ae4a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -22,13 +22,13 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.nodes.FlinkCalc
-import org.apache.flink.table.typeutils.TypeConverter._
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with FlatMapOperator.
@@ -42,7 +42,7 @@ class DataStreamCalc(
private[flink] val calcProgram: RexProgram,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
- with FlinkCalc
+ with CommonCalc
with DataStreamRel {
override def deriveRowType() = rowRelDataType
@@ -68,20 +68,12 @@ class DataStreamCalc(
calcProgram.getCondition != null)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
val generator = new CodeGenerator(config, false, inputDataStream.getType)
val body = functionBody(
@@ -89,14 +81,13 @@ class DataStreamCalc(
inputDataStream.getType,
getRowType,
calcProgram,
- config,
- expectedType)
+ config)
val genFunction = generator.generateFunction(
ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
+ classOf[FlatMapFunction[Row, Row]],
body,
- returnType)
+ FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
val mapFunc = calcMapFunction(genFunction)
inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index bd65954..dd799e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -24,11 +24,11 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.SemiJoinType
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.table.typeutils.TypeConverter._
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with join a user defined table function.
@@ -44,7 +44,7 @@ class DataStreamCorrelate(
joinType: SemiJoinType,
ruleDescription: String)
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkCorrelate
+ with CommonCorrelate
with DataStreamRel {
override def deriveRowType() = relRowType
@@ -79,10 +79,7 @@ class DataStreamCorrelate(
.itemIf("condition", condition.orNull, condition.isDefined)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
@@ -103,7 +100,6 @@ class DataStreamCorrelate(
joinType,
rexCall,
condition,
- expectedType,
Some(pojoFieldMapping),
ruleDescription)
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 16427b8..6f20831 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -19,10 +19,10 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.plan.nodes.FlinkRel
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.FlinkRel
+import org.apache.flink.types.Row
trait DataStreamRel extends RelNode with FlinkRel {
@@ -30,16 +30,9 @@ trait DataStreamRel extends RelNode with FlinkRel {
* Translates the FlinkRelNode into a Flink operator.
*
* @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
- * @param expectedType specifies the type the Flink operator should return. The type must
- * have the same arity as the result. For instance, if the
- * expected type is a RowTypeInfo this method will return a DataSet of
- * type Row. If the expected type is Tuple2, the operator will return
- * a Tuple2 if possible. Row otherwise.
- * @return DataStream of type expectedType or RowTypeInfo
+ * @return DataStream of type [[Row]]
*/
- def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
+ def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 2d5ec09..e8d218e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.plan.schema.DataStreamTable
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with DataStreamSource.
@@ -51,14 +51,12 @@ class DataStreamScan(
)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
- convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config)
+ convertToInternalRow(inputDataStream, dataStreamTable, config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index beb15d2..f676176 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -60,9 +60,7 @@ class DataStreamUnion(
s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index f2a3d72..0ab4a48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.table.typeutils.TypeConverter._
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.io.ValuesInputFormat
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -57,18 +57,11 @@ class DataStreamValues(
)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val generator = new CodeGenerator(config)
@@ -86,12 +79,12 @@ class DataStreamValues(
generatedRecords.map(_.code),
returnType)
- val inputFormat = new ValuesInputFormat[Any](
+ val inputFormat = new ValuesInputFormat[Row](
generatedFunction.name,
generatedFunction.code,
generatedFunction.returnType)
- tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
+ tableEnv.execEnv.createInput(inputFormat, returnType)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index ddac958..56f7f27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -19,17 +19,13 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.runtime.MapRunner
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonScan
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -39,69 +35,37 @@ abstract class StreamScan(
traitSet: RelTraitSet,
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
+ with CommonScan
with DataStreamRel {
- protected def convertToExpectedType(
+ protected def convertToInternalRow(
input: DataStream[Any],
flinkTable: FlinkTable[_],
- expectedType: Option[TypeInformation[Any]],
- config: TableConfig): DataStream[Any] = {
+ config: TableConfig)
+ : DataStream[Row] = {
val inputType = input.getType
- expectedType match {
-
- // special case:
- // if efficient type usage is enabled and no expected type is set
- // we can simply forward the DataSet to the next operator.
- // however, we cannot forward PojoTypes as their fields don't have an order
- case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
- input
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
- // conversion
- if (determinedType != inputType) {
- val generator = new CodeGenerator(
- config,
- nullableInput = false,
- input.getType,
- flinkTable.fieldIndexes)
+ // conversion
+ if (needsConversion(inputType, internalType)) {
- val conversion = generator.generateConverterResultExpression(
- determinedType,
- getRowType.getFieldNames)
+ val mapFunc = getConversionMapper(
+ config,
+ inputType,
+ internalType,
+ "DataStreamSourceConversion",
+ getRowType.getFieldNames,
+ Some(flinkTable.fieldIndexes))
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
+ val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- val genFunction = generator.generateFunction(
- "DataSetSourceConversion",
- classOf[MapFunction[Any, Any]],
- body,
- determinedType)
-
- val mapFunc = new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- input.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- input
- }
+ input.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ input.asInstanceOf[DataStream[Row]]
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 7550593..73d0291 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.StreamTableSource
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.types.Row
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
class StreamTableSourceScan(
@@ -62,15 +62,13 @@ class StreamTableSourceScan(
.item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = tableSource
.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
- convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
+ convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index e89f14f..034ff9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -67,9 +67,10 @@ object AggregateUtil {
*
*/
private[flink] def createPrepareMapFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- groupings: Array[Int],
- inputType: RelDataType): MapFunction[Any, Row] = {
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ groupings: Array[Int],
+ inputType: RelDataType)
+ : MapFunction[Row, Row] = {
val (aggFieldIndexes,aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -83,7 +84,7 @@ object AggregateUtil {
aggregates,
aggFieldIndexes,
groupings,
- mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+ mapReturnType)
mapFunction
}
@@ -113,11 +114,12 @@ object AggregateUtil {
* NOTE: this function is only used for time based window on batch tables.
*/
def createDataSetWindowPrepareMapFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- groupings: Array[Int],
- inputType: RelDataType,
- isParserCaseSensitive: Boolean): MapFunction[Any, Row] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ groupings: Array[Int],
+ inputType: RelDataType,
+ isParserCaseSensitive: Boolean)
+ : MapFunction[Row, Row] = {
val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -147,7 +149,7 @@ object AggregateUtil {
groupings,
timeFieldPos,
tumbleTimeWindowSize,
- mapReturnType).asInstanceOf[MapFunction[Any, Row]]
+ mapReturnType)
}
/**
@@ -159,13 +161,14 @@ object AggregateUtil {
* NOTE: this function is only used for window on batch tables.
*/
def createDataSetWindowAggregationGroupReduceFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty],
- isInputCombined: Boolean = false): RichGroupReduceFunction[Row, Row] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty],
+ isInputCombined: Boolean = false)
+ : RichGroupReduceFunction[Row, Row] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -269,10 +272,11 @@ object AggregateUtil {
*
*/
private[flink] def createDataSetWindowAggregationCombineFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- groupings: Array[Int]): RichGroupCombineFunction[Row,Row] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ groupings: Array[Int])
+ : RichGroupCombineFunction[Row,Row] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -313,11 +317,12 @@ object AggregateUtil {
*
*/
private[flink] def createAggregateGroupReduceFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- inGroupingSet: Boolean): RichGroupReduceFunction[Row, Row] = {
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ inGroupingSet: Boolean)
+ : RichGroupReduceFunction[Row, Row] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -370,10 +375,11 @@ object AggregateUtil {
*
*/
private[flink] def createIncrementalAggregateReduceFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int])
+ : IncrementalAggregateReduceFunction = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),inputType,groupings.length)._2
@@ -397,13 +403,13 @@ object AggregateUtil {
* Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
*/
private[flink] def createAllWindowAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty])
- : AllWindowFunction[Row, Row, DataStreamWindow] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : AllWindowFunction[Row, Row, DataStreamWindow] = {
val aggFunction =
createAggregateGroupReduceFunction(
@@ -427,13 +433,13 @@ object AggregateUtil {
*
*/
private[flink] def createWindowAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty])
- : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
val aggFunction =
createAggregateGroupReduceFunction(
@@ -457,12 +463,13 @@ object AggregateUtil {
* window aggregates.
*/
private[flink] def createAllWindowIncrementalAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : AllWindowFunction[Row, Row, DataStreamWindow] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),inputType,groupings.length)._2
@@ -499,12 +506,13 @@ object AggregateUtil {
* Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
*/
private[flink] def createWindowIncrementalAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),inputType,groupings.length)._2
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
deleted file mode 100644
index a2a120b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.core.JoinRelType._
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkTypeFactory
-
-import scala.collection.JavaConversions._
-
-object TypeConverter {
-
- val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
-
- /**
- * Determines the return type of Flink operators based on the logical fields, the expected
- * physical type and configuration parameters.
- *
- * For example:
- * - No physical type expected, only 3 non-null fields and efficient type usage enabled
- * -> return Tuple3
- * - No physical type expected, efficient type usage enabled, but 3 nullable fields
- * -> return Row because Tuple does not support null values
- * - Physical type expected
- * -> check if physical type is compatible and return it
- *
- * @param logicalRowType logical row information
- * @param expectedPhysicalType expected physical type
- * @param nullable fields can be nullable
- * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
- * @return suitable return type
- */
- def determineReturnType(
- logicalRowType: RelDataType,
- expectedPhysicalType: Option[TypeInformation[Any]],
- nullable: Boolean,
- useEfficientTypes: Boolean)
- : TypeInformation[Any] = {
- // convert to type information
- val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
- FlinkTypeFactory.toTypeInfo(relDataType.getType)
- }
- // field names
- val logicalFieldNames = logicalRowType.getFieldNames.toList
-
- val returnType = expectedPhysicalType match {
- // a certain physical type is expected (but not Row)
- // check if expected physical type is compatible with logical field type
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- if (typeInfo.getArity != logicalFieldTypes.length) {
- throw new TableException("Arity of result does not match expected type.")
- }
- typeInfo match {
-
- // POJO type expected
- case pt: PojoTypeInfo[_] =>
- logicalFieldNames.zip(logicalFieldTypes) foreach {
- case (fName, fType) =>
- val pojoIdx = pt.getFieldIndex(fName)
- if (pojoIdx < 0) {
- throw new TableException(s"POJO does not define field name: $fName")
- }
- val expectedTypeInfo = pt.getTypeAt(pojoIdx)
- if (fType != expectedTypeInfo) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $expectedTypeInfo; Actual: $fType")
- }
- }
-
- // Tuple/Case class type expected
- case ct: CompositeType[_] =>
- logicalFieldTypes.zipWithIndex foreach {
- case (fieldTypeInfo, i) =>
- val expectedTypeInfo = ct.getTypeAt(i)
- if (fieldTypeInfo != expectedTypeInfo) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
- }
- }
-
- // Atomic type expected
- case at: AtomicType[_] =>
- val fieldTypeInfo = logicalFieldTypes.head
- if (fieldTypeInfo != at) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $at; Actual: $fieldTypeInfo")
- }
-
- case _ =>
- throw new TableException("Unsupported result type.")
- }
- typeInfo
-
- // Row is expected, create the arity for it
- case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
- new RowTypeInfo(logicalFieldTypes: _*)
-
- // no physical type
- // determine type based on logical fields and configuration parameters
- case None =>
- // no need for efficient types -> use Row
- // we cannot use efficient types if row arity > tuple arity or nullable
- if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
- new RowTypeInfo(logicalFieldTypes: _*)
- }
- // use efficient type tuple or atomic type
- else {
- if (logicalFieldTypes.length == 1) {
- logicalFieldTypes.head
- }
- else {
- new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
- }
- }
- }
- returnType.asInstanceOf[TypeInformation[Any]]
- }
-
- def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
- case INNER => JoinType.INNER
- case LEFT => JoinType.LEFT_OUTER
- case RIGHT => JoinType.RIGHT_OUTER
- case FULL => JoinType.FULL_OUTER
- }
-
- def flinkJoinTypeToRelType(joinType: JoinType) = joinType match {
- case JoinType.INNER => JoinRelType.INNER
- case JoinType.LEFT_OUTER => JoinRelType.LEFT
- case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
- case JoinType.FULL_OUTER => JoinRelType.FULL
- }
-}