You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/02/13 16:51:22 UTC
[1/3] flink git commit: [FLINK-5662] [table] Rework internal type
handling of Table API
Repository: flink
Updated Branches:
refs/heads/master 1ce10c877 -> 6bc6b225e
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index e84c906..dece295 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -56,8 +56,7 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
@Parameterized.Parameters(name = "Table config = {0}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
- { TableProgramsTestBase.DEFAULT() },
- { TableProgramsTestBase.EFFICIENT() }
+ { TableProgramsTestBase.DEFAULT() }
});
}
@@ -265,8 +264,8 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
Table table = tableEnv
- .fromDataSet(env.fromCollection(data), "a, b, c, d")
- .select("a, b, c, d");
+ .fromDataSet(env.fromCollection(data), "q, w, e, r")
+ .select("q as a, w as b, e as c, r as d");
DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
List<SmallPojo2> results = ds.collect();
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 2b00cc9..6cbe834 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -261,8 +261,8 @@ object TableEnvironmentITCase {
@Parameterized.Parameters(name = "Table config = {0}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
Seq[Array[AnyRef]](
- Array(TableProgramsTestBase.DEFAULT),
- Array(TableProgramsTestBase.EFFICIENT)).asJava
+ Array(TableProgramsTestBase.DEFAULT)
+ ).asJava
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
index a699068..586d716 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.api.scala.batch.utils
import java.util
import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode}
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{NO_NULL, TableConfigMode}
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.junit.runners.Parameterized
@@ -38,8 +38,6 @@ class TableProgramsTestBase(
tableConfigMode match {
case NO_NULL =>
conf.setNullCheck(false)
- case EFFICIENT =>
- conf.setEfficientTypeUsage(true)
case _ => // keep default
}
conf
@@ -47,11 +45,10 @@ class TableProgramsTestBase(
}
object TableProgramsTestBase {
- case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean)
+ case class TableConfigMode(nullCheck: Boolean)
- val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false)
- val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
- val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
+ val DEFAULT = TableConfigMode(nullCheck = true)
+ val NO_NULL = TableConfigMode(nullCheck = false)
@Parameterized.Parameters(name = "Table config = {0}")
def parameters(): util.Collection[Array[java.lang.Object]] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
index 8555632..b4327ec 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala
@@ -95,7 +95,7 @@ abstract class ExpressionTestBase {
val generator = new CodeGenerator(config, false, typeInfo)
// cast expressions to String
- val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR)).toSeq
+ val stringTestExprs = testExprs.map(expr => relBuilder.cast(expr._1, VARCHAR))
// generate code
val resultType = new RowTypeInfo(Seq.fill(testExprs.size)(STRING_TYPE_INFO): _*)
@@ -110,16 +110,16 @@ abstract class ExpressionTestBase {
|return ${genExpr.resultTerm};
|""".stripMargin
- val genFunc = generator.generateFunction[MapFunction[Any, String]](
+ val genFunc = generator.generateFunction[MapFunction[Any, Row], Row](
"TestFunction",
- classOf[MapFunction[Any, String]],
+ classOf[MapFunction[Any, Row]],
bodyCode,
- resultType.asInstanceOf[TypeInformation[Any]])
+ resultType)
// compile and evaluate
- val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc)
+ val clazz = new TestCompiler[MapFunction[Any, Row], Row]().compile(genFunc)
val mapper = clazz.newInstance()
- val result = mapper.map(testData).asInstanceOf[Row]
+ val result = mapper.map(testData)
// compare
testExprs
@@ -211,8 +211,8 @@ abstract class ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
// TestCompiler that uses current class loader
- class TestCompiler[T <: Function] extends Compiler[T] {
- def compile(genFunc: GeneratedFunction[T]): Class[T] =
+ class TestCompiler[F <: Function, T <: Any] extends Compiler[F] {
+ def compile(genFunc: GeneratedFunction[F, T]): Class[F] =
compile(getClass.getClassLoader, genFunc.name, genFunc.code)
}
}
[2/3] flink git commit: [FLINK-5662] [table] Rework internal type
handling of Table API
Posted by tw...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 03178ad..245a038 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -22,15 +22,15 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.calcite.rex._
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.nodes.FlinkCalc
-import org.apache.flink.table.typeutils.TypeConverter
-import TypeConverter._
-import org.apache.calcite.rex._
import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -46,7 +46,7 @@ class DataSetCalc(
private[flink] val calcProgram: RexProgram, // for tests
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
- with FlinkCalc
+ with CommonCalc
with DataSetRel {
override def deriveRowType() = rowRelDataType
@@ -99,19 +99,13 @@ class DataSetCalc(
}
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val generator = new CodeGenerator(config, false, inputDS.getType)
@@ -120,12 +114,11 @@ class DataSetCalc(
inputDS.getType,
getRowType,
calcProgram,
- config,
- expectedType)
+ config)
val genFunction = generator.generateFunction(
ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
+ classOf[FlatMapFunction[Row, Row]],
body,
returnType)
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 5a75e5d..c18a829 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with join a user defined table function.
@@ -45,7 +45,7 @@ class DataSetCorrelate(
joinType: SemiJoinType,
ruleDescription: String)
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkCorrelate
+ with CommonCorrelate
with DataSetRel {
override def deriveRowType() = relRowType
@@ -85,10 +85,7 @@ class DataSetCorrelate(
.itemIf("condition", condition.orNull, condition.isDefined)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
@@ -109,7 +106,6 @@ class DataSetCorrelate(
joinType,
rexCall,
condition,
- expectedType,
Some(pojoFieldMapping),
ruleDescription)
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
index 332aa8a..4497df3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala
@@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.runtime.IntersectCoGroupFunction
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -75,55 +74,21 @@ class DataSetIntersect(
}
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
- val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val coGroupedDs = leftDataSet.coGroup(rightDataSet)
val coGroupOpName = s"intersect: ($intersectSelectionToString)"
- val coGroupFunction = new IntersectCoGroupFunction[Any](all)
-
- val intersectDs = coGroupedDs.where("*").equalTo("*")
- .`with`(coGroupFunction).name(coGroupOpName)
-
- val config = tableEnv.getConfig
- val leftType = leftDataSet.getType
-
- // here we only care about left type information, because we emit records from left DataSet
- expectedType match {
- case None if config.getEfficientTypeUsage =>
- intersectDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != leftType) {
- val mapFunc = getConversionMapper(
- config,
- false,
- leftType,
- determinedType,
- "DataSetIntersectConversion",
- getRowType.getFieldNames)
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- intersectDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- intersectDs
- }
- }
+ val coGroupFunction = new IntersectCoGroupFunction[Row](all)
+
+ coGroupedDs
+ .where("*")
+ .equalTo("*")
+ .`with`(coGroupFunction)
+ .name(coGroupOpName)
}
private def intersectSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
index edb5be2..e6f8ca4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala
@@ -23,17 +23,16 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
+import org.apache.calcite.rex.RexNode
import org.apache.calcite.util.mapping.IntPair
+import org.apache.flink.api.common.functions.FlatJoinFunction
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.FlatJoinRunner
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.api.common.functions.FlatJoinFunction
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
-import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -102,17 +101,11 @@ class DataSetJoin(
planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
// get the equality keys
val leftKeys = ArrayBuffer.empty[Int]
@@ -195,19 +188,22 @@ class DataSetJoin(
}
val genFunction = generator.generateFunction(
ruleDescription,
- classOf[FlatJoinFunction[Any, Any, Any]],
+ classOf[FlatJoinFunction[Row, Row, Row]],
body,
returnType)
- val joinFun = new FlatJoinRunner[Any, Any, Any](
+ val joinFun = new FlatJoinRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType)
val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)"
- joinOperator.where(leftKeys.toArray: _*).equalTo(rightKeys.toArray: _*)
- .`with`(joinFun).name(joinOpName).asInstanceOf[DataSet[Any]]
+ joinOperator
+ .where(leftKeys.toArray: _*)
+ .equalTo(rightKeys.toArray: _*)
+ .`with`(joinFun)
+ .name(joinOpName)
}
private def joinSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
index 672ff9c..9ba65bf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala
@@ -22,11 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.runtime.MinusCoGroupFunction
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -86,55 +85,21 @@ class DataSetMinus(
rowCnt
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
- val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val coGroupedDs = leftDataSet.coGroup(rightDataSet)
val coGroupOpName = s"minus: ($minusSelectionToString)"
- val coGroupFunction = new MinusCoGroupFunction[Any](all)
-
- val minusDs = coGroupedDs.where("*").equalTo("*")
- .`with`(coGroupFunction).name(coGroupOpName)
-
- val config = tableEnv.getConfig
- val leftType = leftDataSet.getType
-
- // here we only care about left type information, because we emit records from left DataSet
- expectedType match {
- case None if config.getEfficientTypeUsage =>
- minusDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != leftType) {
- val mapFunc = getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = leftType,
- expectedType = determinedType,
- conversionOperatorName = "DataSetMinusConversion",
- fieldNames = getRowType.getFieldNames)
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- minusDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- minusDs
- }
- }
+ val coGroupFunction = new MinusCoGroupFunction[Row](all)
+
+ coGroupedDs
+ .where("*")
+ .equalTo("*")
+ .`with`(coGroupFunction)
+ .name(coGroupOpName)
}
private def minusSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
index 02138cf..980f3cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetRel.scala
@@ -19,26 +19,19 @@
package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.rel.RelNode
+import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.plan.nodes.FlinkRel
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.DataSet
+import org.apache.flink.types.Row
trait DataSetRel extends RelNode with FlinkRel {
/**
* Translates the [[DataSetRel]] node into a [[DataSet]] operator.
*
- * @param tableEnv [[BatchTableEnvironment]] of the translated Table.
- * @param expectedType specifies the type the Flink operator should return. The type must
- * have the same arity as the result. For instance, if the
- * expected type is a RowTypeInfo this method will return a DataSet of
- * type Row. If the expected type is Tuple2, the operator will return
- * a Tuple2 if possible. Row otherwise.
- * @return DataSet of type expectedType or RowTypeInfo
+ * @param tableEnv The [[BatchTableEnvironment]] of the translated Table.
+ * @return DataSet of type [[Row]]
*/
- def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None) : DataSet[Any]
+ def translateToPlan(tableEnv: BatchTableEnvironment) : DataSet[Row]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
index 48bbb74..44d2d00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.plan.schema.DataSetTable
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with DataSource.
@@ -51,14 +51,12 @@ class DataSetScan(
)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
val inputDataSet: DataSet[Any] = dataSetTable.dataSet
- convertToExpectedType(inputDataSet, dataSetTable, expectedType, config)
+ convertToInternalRow(inputDataSet, dataSetTable, config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
index a70b4ab..b7d1a4b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala
@@ -26,10 +26,11 @@ import org.apache.calcite.rex.RexNode
import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.{MapJoinLeftRunner, MapJoinRightRunner}
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
-import org.apache.flink.table.api.{BatchTableEnvironment, TableConfig}
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -87,9 +88,7 @@ class DataSetSingleRowJoin(
planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
@@ -100,8 +99,7 @@ class DataSetSingleRowJoin(
rightDataSet.getType,
leftIsSingle,
joinCondition,
- broadcastSetName,
- expectedType)
+ broadcastSetName)
val (multiRowDataSet, singleRowDataSet) =
if (leftIsSingle) {
@@ -114,17 +112,16 @@ class DataSetSingleRowJoin(
.flatMap(mapSideJoin)
.withBroadcastSet(singleRowDataSet, broadcastSetName)
.name(getMapOperatorName)
- .asInstanceOf[DataSet[Any]]
}
private def generateMapFunction(
config: TableConfig,
- inputType1: TypeInformation[Any],
- inputType2: TypeInformation[Any],
+ inputType1: TypeInformation[Row],
+ inputType2: TypeInformation[Row],
firstIsSingle: Boolean,
joinCondition: RexNode,
- broadcastInputSetName: String,
- expectedType: Option[TypeInformation[Any]]): FlatMapFunction[Any, Any] = {
+ broadcastInputSetName: String)
+ : FlatMapFunction[Row, Row] = {
val codeGenerator = new CodeGenerator(
config,
@@ -132,11 +129,7 @@ class DataSetSingleRowJoin(
inputType1,
Some(inputType2))
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val conversion = codeGenerator.generateConverterResultExpression(
returnType,
@@ -144,28 +137,29 @@ class DataSetSingleRowJoin(
val condition = codeGenerator.generateExpression(joinCondition)
- val joinMethodBody = s"""
- |${condition.code}
- |if (${condition.resultTerm}) {
- | ${conversion.code}
- | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
+ val joinMethodBody =
+ s"""
+ |${condition.code}
+ |if (${condition.resultTerm}) {
+ | ${conversion.code}
+ | ${codeGenerator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
val genFunction = codeGenerator.generateFunction(
ruleDescription,
- classOf[FlatJoinFunction[Any, Any, Any]],
+ classOf[FlatJoinFunction[Row, Row, Row]],
joinMethodBody,
returnType)
if (firstIsSingle) {
- new MapJoinRightRunner[Any, Any, Any](
+ new MapJoinRightRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType,
broadcastInputSetName)
} else {
- new MapJoinLeftRunner[Any, Any, Any](
+ new MapJoinLeftRunner[Row, Row, Row](
genFunction.name,
genFunction.code,
genFunction.returnType,
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
index 4d84730..192237a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala
@@ -27,11 +27,10 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelCollation, RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.{BatchTableEnvironment, TableException}
import org.apache.flink.table.runtime.{CountPartitionFunction, LimitFilterFunction}
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -87,10 +86,7 @@ class DataSetSort(
}
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None)
- : DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
if (fieldCollations.isEmpty) {
throw TableException("Limiting the result without sorting is not allowed " +
@@ -113,10 +109,10 @@ class DataSetSort(
partitionedDs = partitionedDs.sortPartition(fieldCollation._1, fieldCollation._2)
}
- val limitedDs = if (offset == null && fetch == null) {
+ if (offset == null && fetch == null) {
partitionedDs
} else {
- val countFunction = new CountPartitionFunction[Any]
+ val countFunction = new CountPartitionFunction[Row]
val partitionCountName = s"prepare offset/fetch"
@@ -126,7 +122,7 @@ class DataSetSort(
val broadcastName = "countPartition"
- val limitFunction = new LimitFilterFunction[Any](
+ val limitFunction = new LimitFilterFunction[Row](
limitStart,
limitEnd,
broadcastName)
@@ -138,41 +134,6 @@ class DataSetSort(
.name(limitName)
.withBroadcastSet(partitionCount, broadcastName)
}
-
- val inputType = partitionedDs.getType
- expectedType match {
-
- case None if config.getEfficientTypeUsage =>
- limitedDs
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- // conversion
- if (determinedType != inputType) {
-
- val mapFunc = getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = partitionedDs.getType,
- expectedType = determinedType,
- conversionOperatorName = "DataSetSortConversion",
- fieldNames = getRowType.getFieldNames.asScala
- )
-
- val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- limitedDs.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- limitedDs
- }
- }
}
private def directionToOrder(direction: Direction) = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
index b0c95b5..a87c6e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
@@ -22,9 +22,9 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTra
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -77,24 +77,12 @@ class DataSetUnion(
getInputs.foldLeft(0.0)(_ + mq.getRowCount(_))
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
-
- var leftDataSet: DataSet[Any] = null
- var rightDataSet: DataSet[Any] = null
-
- expectedType match {
- case None =>
- leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- rightDataSet =
- right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
- case _ =>
- leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
- rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
- }
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
+
+ val leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
+ val rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- leftDataSet.union(rightDataSet).asInstanceOf[DataSet[Any]]
+ leftDataSet.union(rightDataSet)
}
private def unionSelectionToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index e0282f2..3ebee2c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.table.typeutils.TypeConverter._
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -66,17 +66,11 @@ class DataSetValues(
super.explainTerms(pw).item("values", valuesFieldsToString)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val generator = new CodeGenerator(config)
@@ -94,12 +88,12 @@ class DataSetValues(
generatedRecords.map(_.code),
returnType)
- val inputFormat = new ValuesInputFormat[Any](
+ val inputFormat = new ValuesInputFormat[Row](
generatedFunction.name,
generatedFunction.code,
generatedFunction.returnType)
- tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataSet[Any]]
+ tableEnv.execEnv.createInput(inputFormat, returnType)
}
private def valuesFieldsToString: String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index b165afa..48de822 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -23,21 +23,17 @@ import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.runtime.aggregate.AggregateUtil.{CalcitePair, _}
import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.TypeConverter
import org.apache.flink.types.Row
-import scala.collection.JavaConversions._
-
/**
* Flink RelNode which matches along with a LogicalWindowAggregate.
*/
@@ -52,7 +48,7 @@ class DataSetWindowAggregate(
inputType: RelDataType,
grouping: Array[Int])
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkAggregate
+ with CommonAggregate
with DataSetRel {
override def deriveRowType() = rowRelDataType
@@ -109,20 +105,15 @@ class DataSetWindowAggregate(
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
- val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
- tableEnv,
- // tell the input operator that this operator currently only supports Rows as input
- Some(TypeConverter.DEFAULT_ROW_TYPE))
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
// whether identifiers are matched case-sensitively
val caseSensitive = tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
- val result = window match {
+ window match {
case EventTimeTumblingGroupWindow(_, _, size) =>
createEventTimeTumblingWindowDataSet(
inputDS,
@@ -139,31 +130,14 @@ class DataSetWindowAggregate(
"windows in a batch environment must declare a time attribute over which " +
"the query is evaluated.")
}
-
- // if the expected type is not a Row, inject a mapper to convert to the expected type
- expectedType match {
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${getRowType.getFieldNames.toList.mkString(", ")})"
- result.map(
- getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = resultRowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType = expectedType.get,
- conversionOperatorName = "DataSetWindowAggregateConversion",
- fieldNames = getRowType.getFieldNames
- ))
- .name(mapName)
- case _ => result
- }
}
private def createEventTimeTumblingWindowDataSet(
- inputDS: DataSet[Any],
+ inputDS: DataSet[Row],
isTimeWindow: Boolean,
isParserCaseSensitive: Boolean)
- : DataSet[Any] = {
+ : DataSet[Row] = {
val mapFunction = createDataSetWindowPrepareMapFunction(
window,
namedAggregates,
@@ -182,6 +156,8 @@ class DataSetWindowAggregate(
.map(mapFunction)
.name(prepareOperatorName)
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
if (isTimeWindow) {
// grouped time window aggregation
@@ -190,9 +166,8 @@ class DataSetWindowAggregate(
mappedInput.asInstanceOf[DataSet[Row]]
.groupBy(groupingKeys: _*)
.reduceGroup(groupReduceFunction)
- .returns(resultRowTypeInfo)
+ .returns(rowTypeInfo)
.name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
} else {
// count window
val groupingKeys = grouping.indices.toArray
@@ -203,10 +178,8 @@ class DataSetWindowAggregate(
// sort on time field, it's the last element in the row
.sortGroup(mapReturnType.getArity - 1, Order.ASCENDING)
.reduceGroup(groupReduceFunction)
- .returns(resultRowTypeInfo)
+ .returns(rowTypeInfo)
.name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
-
} else {
// TODO: count tumbling all window on event-time should sort all the data set
// on event time before applying the windowing logic.
@@ -217,11 +190,12 @@ class DataSetWindowAggregate(
}
private[this] def createEventTimeSessionWindowDataSet(
- inputDS: DataSet[Any],
- isParserCaseSensitive: Boolean): DataSet[Any] = {
+ inputDS: DataSet[Row],
+ isParserCaseSensitive: Boolean)
+ : DataSet[Row] = {
val groupingKeys = grouping.indices.toArray
- val rowTypeInfo = resultRowTypeInfo
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
// grouping window
if (groupingKeys.length > 0) {
@@ -280,7 +254,6 @@ class DataSetWindowAggregate(
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
}
// do non-incremental aggregation
else {
@@ -298,7 +271,6 @@ class DataSetWindowAggregate(
.reduceGroup(groupReduceFunction)
.returns(rowTypeInfo)
.name(aggregateOperatorName)
- .asInstanceOf[DataSet[Any]]
}
}
// non-grouping window
@@ -332,12 +304,4 @@ class DataSetWindowAggregate(
s"window: ($window), select: ($aggString)"
}
}
-
- private def resultRowTypeInfo: RowTypeInfo = {
- // get the output types
- val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
- .toArray
- new RowTypeInfo(fieldTypes: _*)
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
index 6a3d4e3..c21d008 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -22,27 +22,23 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.logical._
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate._
import org.apache.flink.table.runtime.aggregate.AggregateUtil._
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeConverter}
-import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
-import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
-
-import scala.collection.JavaConverters._
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+import org.apache.flink.types.Row
class DataStreamAggregate(
window: LogicalWindow,
@@ -55,7 +51,7 @@ class DataStreamAggregate(
inputType: RelDataType,
grouping: Array[Int])
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkAggregate
+ with CommonAggregate
with DataStreamRel {
override def deriveRowType(): RelDataType = rowRelDataType
@@ -103,24 +99,12 @@ class DataStreamAggregate(
namedProperties))
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
- val config = tableEnv.getConfig
val groupingKeys = grouping.indices.toArray
- val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
- tableEnv,
- // tell the input operator that this operator currently only supports Rows as input
- Some(TypeConverter.DEFAULT_ROW_TYPE))
-
- // get the output types
- val fieldTypes: Array[TypeInformation[_]] =
- getRowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
- .toArray
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
- val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val aggString = aggregationToString(
inputType,
@@ -142,121 +126,100 @@ class DataStreamAggregate(
val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
- val result: DataStream[Any] = {
- // check whether all aggregates support partial aggregate
- if (AggregateUtil.doAllSupportPartialAggregation(
- namedAggregates.map(_.getKey),
- inputType,
- grouping.length)) {
- // do Incremental Aggregation
- val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+
+ // check whether all aggregates support partial aggregate
+ if (AggregateUtil.doAllSupportPartialAggregation(
+ namedAggregates.map(_.getKey),
+ inputType,
+ grouping.length)) {
+ // do Incremental Aggregation
+ val reduceFunction = AggregateUtil.createIncrementalAggregateReduceFunction(
+ namedAggregates,
+ inputType,
+ getRowType,
+ grouping)
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+ val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
+ window,
namedAggregates,
inputType,
- getRowType,
- grouping)
- // grouped / keyed aggregation
- if (groupingKeys.length > 0) {
- val windowFunction = AggregateUtil.createWindowIncrementalAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val keyedStream = mappedInput.keyBy(groupingKeys: _*)
- val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
- windowedStream
- .apply(reduceFunction, windowFunction)
- .returns(rowTypeInfo)
- .name(keyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
- // global / non-keyed aggregation
- else {
- val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val windowedStream =
- createNonKeyedWindowedStream(window, mappedInput)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
- windowedStream
- .apply(reduceFunction, windowFunction)
- .returns(rowTypeInfo)
- .name(nonKeyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+ val windowedStream =
+ createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+ windowedStream
+ .reduce(reduceFunction, windowFunction)
+ .returns(rowTypeInfo)
+ .name(keyedAggOpName)
}
+ // global / non-keyed aggregation
else {
- // do non-Incremental Aggregation
- // grouped / keyed aggregation
- if (groupingKeys.length > 0) {
-
- val windowFunction = AggregateUtil.createWindowAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val keyedStream = mappedInput.keyBy(groupingKeys: _*)
- val windowedStream =
- createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
- windowedStream
- .apply(windowFunction)
- .returns(rowTypeInfo)
- .name(keyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
- // global / non-keyed aggregation
- else {
- val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
- window,
- namedAggregates,
- inputType,
- rowRelDataType,
- grouping,
- namedProperties)
-
- val windowedStream =
- createNonKeyedWindowedStream(window, mappedInput)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
- windowedStream
- .apply(windowFunction)
- .returns(rowTypeInfo)
- .name(nonKeyedAggOpName)
- .asInstanceOf[DataStream[Any]]
- }
+ val windowFunction = AggregateUtil.createAllWindowIncrementalAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val windowedStream =
+ createNonKeyedWindowedStream(window, mappedInput)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ windowedStream
+ .reduce(reduceFunction, windowFunction)
+ .returns(rowTypeInfo)
+ .name(nonKeyedAggOpName)
}
}
+ else {
+ // do non-Incremental Aggregation
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+
+ val windowFunction = AggregateUtil.createWindowAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+ val windowedStream =
+ createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
- // if the expected type is not a Row, inject a mapper to convert to the expected type
- expectedType match {
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- result.map(getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType = expectedType.get,
- conversionOperatorName = "DataStreamAggregateConversion",
- fieldNames = getRowType.getFieldNames.asScala
- ))
- .name(mapName)
- case _ => result
+ windowedStream
+ .apply(windowFunction)
+ .returns(rowTypeInfo)
+ .name(keyedAggOpName)
+ }
+ // global / non-keyed aggregation
+ else {
+ val windowFunction = AggregateUtil.createAllWindowAggregationFunction(
+ window,
+ namedAggregates,
+ inputType,
+ rowRelDataType,
+ grouping,
+ namedProperties)
+
+ val windowedStream =
+ createNonKeyedWindowedStream(window, mappedInput)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ windowedStream
+ .apply(windowFunction)
+ .returns(rowTypeInfo)
+ .name(nonKeyedAggOpName)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index 43f1fb6..b39ae4a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -22,13 +22,13 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.nodes.FlinkCalc
-import org.apache.flink.table.typeutils.TypeConverter._
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with FlatMapOperator.
@@ -42,7 +42,7 @@ class DataStreamCalc(
private[flink] val calcProgram: RexProgram,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
- with FlinkCalc
+ with CommonCalc
with DataStreamRel {
override def deriveRowType() = rowRelDataType
@@ -68,20 +68,12 @@ class DataStreamCalc(
calcProgram.getCondition != null)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
val generator = new CodeGenerator(config, false, inputDataStream.getType)
val body = functionBody(
@@ -89,14 +81,13 @@ class DataStreamCalc(
inputDataStream.getType,
getRowType,
calcProgram,
- config,
- expectedType)
+ config)
val genFunction = generator.generateFunction(
ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
+ classOf[FlatMapFunction[Row, Row]],
body,
- returnType)
+ FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
val mapFunc = calcMapFunction(genFunction)
inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index bd65954..dd799e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -24,11 +24,11 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.SemiJoinType
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.plan.nodes.FlinkCorrelate
-import org.apache.flink.table.typeutils.TypeConverter._
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.plan.nodes.CommonCorrelate
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with join a user defined table function.
@@ -44,7 +44,7 @@ class DataStreamCorrelate(
joinType: SemiJoinType,
ruleDescription: String)
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkCorrelate
+ with CommonCorrelate
with DataStreamRel {
override def deriveRowType() = relRowType
@@ -79,10 +79,7 @@ class DataStreamCorrelate(
.itemIf("condition", condition.orNull, condition.isDefined)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
@@ -103,7 +100,6 @@ class DataStreamCorrelate(
joinType,
rexCall,
condition,
- expectedType,
Some(pojoFieldMapping),
ruleDescription)
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 16427b8..6f20831 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -19,10 +19,10 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.rel.RelNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.plan.nodes.FlinkRel
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.nodes.FlinkRel
+import org.apache.flink.types.Row
trait DataStreamRel extends RelNode with FlinkRel {
@@ -30,16 +30,9 @@ trait DataStreamRel extends RelNode with FlinkRel {
* Translates the FlinkRelNode into a Flink operator.
*
* @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
- * @param expectedType specifies the type the Flink operator should return. The type must
- * have the same arity as the result. For instance, if the
- * expected type is a RowTypeInfo this method will return a DataSet of
- * type Row. If the expected type is Tuple2, the operator will return
- * a Tuple2 if possible. Row otherwise.
- * @return DataStream of type expectedType or RowTypeInfo
+ * @return DataStream of type [[Row]]
*/
- def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]] = None) : DataStream[Any]
+ def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 2d5ec09..e8d218e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -21,10 +21,10 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.plan.schema.DataStreamTable
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.types.Row
/**
* Flink RelNode which matches along with DataStreamSource.
@@ -51,14 +51,12 @@ class DataStreamScan(
)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
- convertToExpectedType(inputDataStream, dataStreamTable, expectedType, config)
+ convertToInternalRow(inputDataStream, dataStreamTable, config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index beb15d2..f676176 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -21,9 +21,9 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -60,9 +60,7 @@ class DataStreamUnion(
s"Union(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index f2a3d72..0ab4a48 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -24,12 +24,12 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.table.typeutils.TypeConverter._
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.io.ValuesInputFormat
+import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -57,18 +57,11 @@ class DataStreamValues(
)
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]])
- : DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
- val returnType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
val generator = new CodeGenerator(config)
@@ -86,12 +79,12 @@ class DataStreamValues(
generatedRecords.map(_.code),
returnType)
- val inputFormat = new ValuesInputFormat[Any](
+ val inputFormat = new ValuesInputFormat[Row](
generatedFunction.name,
generatedFunction.code,
generatedFunction.returnType)
- tableEnv.execEnv.createInput(inputFormat, returnType).asInstanceOf[DataStream[Any]]
+ tableEnv.execEnv.createInput(inputFormat, returnType)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index ddac958..56f7f27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -19,17 +19,13 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.runtime.MapRunner
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonScan
+import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -39,69 +35,37 @@ abstract class StreamScan(
traitSet: RelTraitSet,
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
+ with CommonScan
with DataStreamRel {
- protected def convertToExpectedType(
+ protected def convertToInternalRow(
input: DataStream[Any],
flinkTable: FlinkTable[_],
- expectedType: Option[TypeInformation[Any]],
- config: TableConfig): DataStream[Any] = {
+ config: TableConfig)
+ : DataStream[Row] = {
val inputType = input.getType
- expectedType match {
-
- // special case:
- // if efficient type usage is enabled and no expected type is set
- // we can simply forward the DataSet to the next operator.
- // however, we cannot forward PojoTypes as their fields don't have an order
- case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
- input
-
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
- // conversion
- if (determinedType != inputType) {
- val generator = new CodeGenerator(
- config,
- nullableInput = false,
- input.getType,
- flinkTable.fieldIndexes)
+ // conversion
+ if (needsConversion(inputType, internalType)) {
- val conversion = generator.generateConverterResultExpression(
- determinedType,
- getRowType.getFieldNames)
+ val mapFunc = getConversionMapper(
+ config,
+ inputType,
+ internalType,
+ "DataStreamSourceConversion",
+ getRowType.getFieldNames,
+ Some(flinkTable.fieldIndexes))
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
+ val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- val genFunction = generator.generateFunction(
- "DataSetSourceConversion",
- classOf[MapFunction[Any, Any]],
- body,
- determinedType)
-
- val mapFunc = new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- input.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- input
- }
+ input.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ input.asInstanceOf[DataStream[Row]]
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 7550593..73d0291 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan._
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.StreamTableSource
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.types.Row
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
class StreamTableSourceScan(
@@ -62,15 +62,13 @@ class StreamTableSourceScan(
.item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
}
- override def translateToPlan(
- tableEnv: StreamTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = tableSource
.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
- convertToExpectedType(inputDataStream, new TableSourceTable(tableSource), expectedType, config)
+ convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index e89f14f..034ff9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -67,9 +67,10 @@ object AggregateUtil {
*
*/
private[flink] def createPrepareMapFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- groupings: Array[Int],
- inputType: RelDataType): MapFunction[Any, Row] = {
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ groupings: Array[Int],
+ inputType: RelDataType)
+ : MapFunction[Row, Row] = {
val (aggFieldIndexes,aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -83,7 +84,7 @@ object AggregateUtil {
aggregates,
aggFieldIndexes,
groupings,
- mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+ mapReturnType)
mapFunction
}
@@ -113,11 +114,12 @@ object AggregateUtil {
* NOTE: this function is only used for time based window on batch tables.
*/
def createDataSetWindowPrepareMapFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- groupings: Array[Int],
- inputType: RelDataType,
- isParserCaseSensitive: Boolean): MapFunction[Any, Row] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ groupings: Array[Int],
+ inputType: RelDataType,
+ isParserCaseSensitive: Boolean)
+ : MapFunction[Row, Row] = {
val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -147,7 +149,7 @@ object AggregateUtil {
groupings,
timeFieldPos,
tumbleTimeWindowSize,
- mapReturnType).asInstanceOf[MapFunction[Any, Row]]
+ mapReturnType)
}
/**
@@ -159,13 +161,14 @@ object AggregateUtil {
* NOTE: this function is only used for window on batch tables.
*/
def createDataSetWindowAggregationGroupReduceFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty],
- isInputCombined: Boolean = false): RichGroupReduceFunction[Row, Row] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty],
+ isInputCombined: Boolean = false)
+ : RichGroupReduceFunction[Row, Row] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -269,10 +272,11 @@ object AggregateUtil {
*
*/
private[flink] def createDataSetWindowAggregationCombineFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- groupings: Array[Int]): RichGroupCombineFunction[Row,Row] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ groupings: Array[Int])
+ : RichGroupCombineFunction[Row,Row] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -313,11 +317,12 @@ object AggregateUtil {
*
*/
private[flink] def createAggregateGroupReduceFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- inGroupingSet: Boolean): RichGroupReduceFunction[Row, Row] = {
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ inGroupingSet: Boolean)
+ : RichGroupReduceFunction[Row, Row] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -370,10 +375,11 @@ object AggregateUtil {
*
*/
private[flink] def createIncrementalAggregateReduceFunction(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int])
+ : IncrementalAggregateReduceFunction = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),inputType,groupings.length)._2
@@ -397,13 +403,13 @@ object AggregateUtil {
* Create an [[AllWindowFunction]] to compute non-partitioned group window aggregates.
*/
private[flink] def createAllWindowAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty])
- : AllWindowFunction[Row, Row, DataStreamWindow] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : AllWindowFunction[Row, Row, DataStreamWindow] = {
val aggFunction =
createAggregateGroupReduceFunction(
@@ -427,13 +433,13 @@ object AggregateUtil {
*
*/
private[flink] def createWindowAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty])
- : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
val aggFunction =
createAggregateGroupReduceFunction(
@@ -457,12 +463,13 @@ object AggregateUtil {
* window aggregates.
*/
private[flink] def createAllWindowIncrementalAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty]): AllWindowFunction[Row, Row, DataStreamWindow] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : AllWindowFunction[Row, Row, DataStreamWindow] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),inputType,groupings.length)._2
@@ -499,12 +506,13 @@ object AggregateUtil {
* Create a [[WindowFunction]] to finalize incrementally pre-computed window aggregates.
*/
private[flink] def createWindowIncrementalAggregationFunction(
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int],
- properties: Seq[NamedWindowProperty]): WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
val aggregates = transformToAggregateFunctions(
namedAggregates.map(_.getKey),inputType,groupings.length)._2
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
deleted file mode 100644
index a2a120b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeConverter.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.typeutils
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rel.core.JoinRelType._
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.calcite.FlinkTypeFactory
-
-import scala.collection.JavaConversions._
-
-object TypeConverter {
-
- val DEFAULT_ROW_TYPE = new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
-
- /**
- * Determines the return type of Flink operators based on the logical fields, the expected
- * physical type and configuration parameters.
- *
- * For example:
- * - No physical type expected, only 3 non-null fields and efficient type usage enabled
- * -> return Tuple3
- * - No physical type expected, efficient type usage enabled, but 3 nullable fields
- * -> return Row because Tuple does not support null values
- * - Physical type expected
- * -> check if physical type is compatible and return it
- *
- * @param logicalRowType logical row information
- * @param expectedPhysicalType expected physical type
- * @param nullable fields can be nullable
- * @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
- * @return suitable return type
- */
- def determineReturnType(
- logicalRowType: RelDataType,
- expectedPhysicalType: Option[TypeInformation[Any]],
- nullable: Boolean,
- useEfficientTypes: Boolean)
- : TypeInformation[Any] = {
- // convert to type information
- val logicalFieldTypes = logicalRowType.getFieldList map { relDataType =>
- FlinkTypeFactory.toTypeInfo(relDataType.getType)
- }
- // field names
- val logicalFieldNames = logicalRowType.getFieldNames.toList
-
- val returnType = expectedPhysicalType match {
- // a certain physical type is expected (but not Row)
- // check if expected physical type is compatible with logical field type
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- if (typeInfo.getArity != logicalFieldTypes.length) {
- throw new TableException("Arity of result does not match expected type.")
- }
- typeInfo match {
-
- // POJO type expected
- case pt: PojoTypeInfo[_] =>
- logicalFieldNames.zip(logicalFieldTypes) foreach {
- case (fName, fType) =>
- val pojoIdx = pt.getFieldIndex(fName)
- if (pojoIdx < 0) {
- throw new TableException(s"POJO does not define field name: $fName")
- }
- val expectedTypeInfo = pt.getTypeAt(pojoIdx)
- if (fType != expectedTypeInfo) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $expectedTypeInfo; Actual: $fType")
- }
- }
-
- // Tuple/Case class type expected
- case ct: CompositeType[_] =>
- logicalFieldTypes.zipWithIndex foreach {
- case (fieldTypeInfo, i) =>
- val expectedTypeInfo = ct.getTypeAt(i)
- if (fieldTypeInfo != expectedTypeInfo) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo")
- }
- }
-
- // Atomic type expected
- case at: AtomicType[_] =>
- val fieldTypeInfo = logicalFieldTypes.head
- if (fieldTypeInfo != at) {
- throw new TableException(s"Result field does not match expected type. " +
- s"Expected: $at; Actual: $fieldTypeInfo")
- }
-
- case _ =>
- throw new TableException("Unsupported result type.")
- }
- typeInfo
-
- // Row is expected, create the arity for it
- case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] =>
- new RowTypeInfo(logicalFieldTypes: _*)
-
- // no physical type
- // determine type based on logical fields and configuration parameters
- case None =>
- // no need for efficient types -> use Row
- // we cannot use efficient types if row arity > tuple arity or nullable
- if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) {
- new RowTypeInfo(logicalFieldTypes: _*)
- }
- // use efficient type tuple or atomic type
- else {
- if (logicalFieldTypes.length == 1) {
- logicalFieldTypes.head
- }
- else {
- new TupleTypeInfo[Tuple](logicalFieldTypes.toArray:_*)
- }
- }
- }
- returnType.asInstanceOf[TypeInformation[Any]]
- }
-
- def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
- case INNER => JoinType.INNER
- case LEFT => JoinType.LEFT_OUTER
- case RIGHT => JoinType.RIGHT_OUTER
- case FULL => JoinType.FULL_OUTER
- }
-
- def flinkJoinTypeToRelType(joinType: JoinType) = joinType match {
- case JoinType.INNER => JoinRelType.INNER
- case JoinType.LEFT_OUTER => JoinRelType.LEFT
- case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
- case JoinType.FULL_OUTER => JoinRelType.FULL
- }
-}
[3/3] flink git commit: [FLINK-5662] [table] Rework internal type
handling of Table API
Posted by tw...@apache.org.
[FLINK-5662] [table] Rework internal type handling of Table API
This closes #3271.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6b225
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6b225
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6b225
Branch: refs/heads/master
Commit: 6bc6b225e55095eb8797db2903b0546410e7fdd9
Parents: 1ce10c8
Author: twalthr <tw...@apache.org>
Authored: Mon Feb 6 17:18:08 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Mon Feb 13 17:50:00 2017 +0100
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 34 ++-
.../table/api/StreamTableEnvironment.scala | 41 ++--
.../apache/flink/table/api/TableConfig.scala | 24 --
.../flink/table/api/TableEnvironment.scala | 130 +++++++++-
.../flink/table/calcite/FlinkTypeFactory.scala | 17 +-
.../flink/table/codegen/CodeGenerator.scala | 33 +--
.../flink/table/codegen/ExpressionReducer.scala | 11 +-
.../apache/flink/table/codegen/generated.scala | 25 +-
.../flink/table/plan/logical/operators.scala | 18 +-
.../table/plan/nodes/CommonAggregate.scala | 69 ++++++
.../flink/table/plan/nodes/CommonCalc.scala | 152 ++++++++++++
.../table/plan/nodes/CommonCorrelate.scala | 229 ++++++++++++++++++
.../flink/table/plan/nodes/CommonScan.scala | 82 +++++++
.../flink/table/plan/nodes/FlinkAggregate.scala | 69 ------
.../flink/table/plan/nodes/FlinkCalc.scala | 172 -------------
.../flink/table/plan/nodes/FlinkCorrelate.scala | 233 ------------------
.../flink/table/plan/nodes/FlinkRel.scala | 37 ---
.../table/plan/nodes/dataset/BatchScan.scala | 61 ++---
.../nodes/dataset/BatchTableSourceScan.scala | 7 +-
.../plan/nodes/dataset/DataSetAggregate.scala | 82 ++-----
.../table/plan/nodes/dataset/DataSetCalc.scala | 27 +--
.../plan/nodes/dataset/DataSetCorrelate.scala | 12 +-
.../plan/nodes/dataset/DataSetIntersect.scala | 57 +----
.../table/plan/nodes/dataset/DataSetJoin.scala | 32 ++-
.../table/plan/nodes/dataset/DataSetMinus.scala | 57 +----
.../table/plan/nodes/dataset/DataSetRel.scala | 17 +-
.../table/plan/nodes/dataset/DataSetScan.scala | 8 +-
.../nodes/dataset/DataSetSingleRowJoin.scala | 48 ++--
.../table/plan/nodes/dataset/DataSetSort.scala | 49 +---
.../table/plan/nodes/dataset/DataSetUnion.scala | 24 +-
.../plan/nodes/dataset/DataSetValues.scala | 18 +-
.../nodes/dataset/DataSetWindowAggregate.scala | 68 ++----
.../nodes/datastream/DataStreamAggregate.scala | 239 ++++++++-----------
.../plan/nodes/datastream/DataStreamCalc.scala | 27 +--
.../nodes/datastream/DataStreamCorrelate.scala | 14 +-
.../plan/nodes/datastream/DataStreamRel.scala | 15 +-
.../plan/nodes/datastream/DataStreamScan.scala | 10 +-
.../plan/nodes/datastream/DataStreamUnion.scala | 6 +-
.../nodes/datastream/DataStreamValues.scala | 23 +-
.../plan/nodes/datastream/StreamScan.scala | 84 ++-----
.../datastream/StreamTableSourceScan.scala | 12 +-
.../table/runtime/aggregate/AggregateUtil.scala | 120 +++++-----
.../flink/table/typeutils/TypeConverter.scala | 156 ------------
.../api/java/batch/TableEnvironmentITCase.java | 7 +-
.../scala/batch/TableEnvironmentITCase.scala | 4 +-
.../batch/utils/TableProgramsTestBase.scala | 11 +-
.../expressions/utils/ExpressionTestBase.scala | 16 +-
47 files changed, 1184 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index dd0487a..2dec00e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -23,11 +23,12 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{Programs, RuleSet}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
@@ -135,7 +136,7 @@ abstract class BatchTableEnvironment(
private[flink] def explain(table: Table, extended: Boolean): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
- val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
+ val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new GenericTypeInfo(classOf[Row]))
dataSet.output(new DiscardingOutputFormat[Row])
val env = dataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
@@ -250,28 +251,39 @@ abstract class BatchTableEnvironment(
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
- val dataSetPlan = optimize(table.getRelNode)
- translate(dataSetPlan)
+ val relNode = table.getRelNode
+ val dataSetPlan = optimize(relNode)
+ translate(dataSetPlan, relNode.getRowType)
}
/**
- * Translates a logical [[RelNode]] into a [[DataSet]].
+ * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary.
*
* @param logicalPlan The root node of the relational expression tree.
+ * @param logicalType The row type of the result. Since the logicalPlan can lose the
+ * field naming during optimization we pass the row type separately.
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
- protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+ protected def translate[A](
+ logicalPlan: RelNode,
+ logicalType: RelDataType)
+ (implicit tpe: TypeInformation[A]): DataSet[A] = {
TableEnvironment.validateType(tpe)
logicalPlan match {
case node: DataSetRel =>
- node.translateToPlan(
- this,
- Some(tpe.asInstanceOf[TypeInformation[Any]])
- ).asInstanceOf[DataSet[A]]
- case _ => ???
+ val plan = node.translateToPlan(this)
+ val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion")
+ conversion match {
+ case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
+ case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+ }
+
+ case _ =>
+ throw TableException("Cannot generate DataSet due to an invalid logical plan. " +
+ "This is a bug and should not happen. Please file an issue.")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 81e884d..19c4af1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -23,10 +23,11 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{Programs, RuleSet}
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.explain.PlanJsonParser
@@ -200,11 +201,11 @@ abstract class StreamTableEnvironment(
dataStream: DataStream[T],
fields: Array[Expression]): Unit = {
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields)
val dataStreamTable = new DataStreamTable[T](
dataStream,
- fieldIndexes.toArray,
- fieldNames.toArray
+ fieldIndexes,
+ fieldNames
)
registerTableInternal(name, dataStreamTable)
}
@@ -255,30 +256,40 @@ abstract class StreamTableEnvironment(
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
- val dataStreamPlan = optimize(table.getRelNode)
- translate(dataStreamPlan)
+ val relNode = table.getRelNode
+ val dataStreamPlan = optimize(relNode)
+ translate(dataStreamPlan, relNode.getRowType)
}
/**
* Translates a logical [[RelNode]] into a [[DataStream]].
*
* @param logicalPlan The root node of the relational expression tree.
+ * @param logicalType The row type of the result. Since the logicalPlan can lose the
+ * field naming during optimization we pass the row type separately.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
- protected def translate[A]
- (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+ protected def translate[A](
+ logicalPlan: RelNode,
+ logicalType: RelDataType)
+ (implicit tpe: TypeInformation[A]): DataStream[A] = {
TableEnvironment.validateType(tpe)
logicalPlan match {
case node: DataStreamRel =>
- node.translateToPlan(
- this,
- Some(tpe.asInstanceOf[TypeInformation[Any]])
- ).asInstanceOf[DataStream[A]]
- case _ => ???
+ val plan = node.translateToPlan(this)
+ val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
+ conversion match {
+ case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary
+ case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+ }
+
+ case _ =>
+ throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
+ "This is a bug and should not happen. Please file an issue.")
}
}
@@ -291,7 +302,9 @@ abstract class StreamTableEnvironment(
def explain(table: Table): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
- val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+ val dataStream = translate[Row](
+ optimizedPlan,
+ ast.getRowType)(new GenericTypeInfo(classOf[Row]))
val env = dataStream.getExecutionEnvironment
val jsonSqlPlan = env.getExecutionPlan
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index a8876a8..6448657 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -37,12 +37,6 @@ class TableConfig {
private var nullCheck: Boolean = true
/**
- * Defines if efficient types (such as Tuple types or Atomic types)
- * should be used within operators where possible.
- */
- private var efficientTypeUsage = false
-
- /**
* Defines the configuration of Calcite for Table API and SQL queries.
*/
private var calciteConfig = CalciteConfig.DEFAULT
@@ -73,24 +67,6 @@ class TableConfig {
}
/**
- * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types
- * or Atomic types) are used within operators where possible.
- *
- * NOTE: Currently, this is an experimental feature.
- */
- def getEfficientTypeUsage = efficientTypeUsage
-
- /**
- * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types
- * or Atomic types) are used within operators where possible.
- *
- * NOTE: Currently, this is an experimental feature.
- */
- def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = {
- this.efficientTypeUsage = efficientTypeUsage
- }
-
- /**
* Returns the current configuration of Calcite for Table API and SQL queries.
*/
def getCalciteConfig: CalciteConfig = calciteConfig
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bcff1fb..b36441a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -31,27 +31,30 @@ import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
+import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
-import java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
+import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.codegen.ExpressionReducer
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
-import org.apache.flink.table.plan.schema.{RelTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.RelTable
+import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
@@ -410,7 +413,7 @@ abstract class TableEnvironment(val config: TableConfig) {
}
exprs.map {
case UnresolvedFieldReference(name) => (0, name)
- case _ => throw new TableException("Field reference expression expected.")
+ case _ => throw new TableException("Field reference expression requested.")
}
case t: TupleTypeInfo[A] =>
exprs.zipWithIndex.map {
@@ -466,6 +469,123 @@ abstract class TableEnvironment(val config: TableConfig) {
(fieldNames.toArray, fieldIndexes.toArray)
}
+ /**
+ * Creates a final converter that maps the internal row type to external type.
+ *
+ * @param physicalRowTypeInfo the input of the sink
+ * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+ * @param requestedTypeInfo the output type of the sink
+ * @param functionName name of the map function. Must not be unique but has to be a
+ * valid Java class identifier.
+ */
+ protected def sinkConversion[T](
+ physicalRowTypeInfo: TypeInformation[Row],
+ logicalRowType: RelDataType,
+ requestedTypeInfo: TypeInformation[T],
+ functionName: String)
+ : Option[MapFunction[Row, T]] = {
+
+ // validate that at least the field types of physical and logical type match
+ // we do that here to make sure that plan translation was correct
+ val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
+ if (physicalRowTypeInfo != logicalRowTypeInfo) {
+ throw TableException("The field types of physical and logical row types do not match." +
+ "This is a bug and should not happen. Please file an issue.")
+ }
+
+ // requested type is a generic Row, no conversion needed
+ if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
+ requestedTypeInfo.getTypeClass == classOf[Row]) {
+ return None
+ }
+
+ // convert to type information
+ val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
+ FlinkTypeFactory.toTypeInfo(relDataType.getType)
+ }
+ // field names
+ val logicalFieldNames = logicalRowType.getFieldNames.asScala
+
+ // validate requested type
+ if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
+ throw new TableException("Arity of result does not match requested type.")
+ }
+ requestedTypeInfo match {
+
+ // POJO type requested
+ case pt: PojoTypeInfo[_] =>
+ logicalFieldNames.zip(logicalFieldTypes) foreach {
+ case (fName, fType) =>
+ val pojoIdx = pt.getFieldIndex(fName)
+ if (pojoIdx < 0) {
+ throw new TableException(s"POJO does not define field name: $fName")
+ }
+ val requestedTypeInfo = pt.getTypeAt(pojoIdx)
+ if (fType != requestedTypeInfo) {
+ throw new TableException(s"Result field does not match requested type. " +
+ s"requested: $requestedTypeInfo; Actual: $fType")
+ }
+ }
+
+ // Tuple/Case class/Row type requested
+ case tt: TupleTypeInfoBase[_] =>
+ logicalFieldTypes.zipWithIndex foreach {
+ case (fieldTypeInfo, i) =>
+ val requestedTypeInfo = tt.getTypeAt(i)
+ if (fieldTypeInfo != requestedTypeInfo) {
+ throw new TableException(s"Result field does not match requested type. " +
+ s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo")
+ }
+ }
+
+ // Atomic type requested
+ case at: AtomicType[_] =>
+ if (logicalFieldTypes.size != 1) {
+ throw new TableException(s"Requested result type is an atomic type but " +
+ s"result has more or less than a single field.")
+ }
+ val fieldTypeInfo = logicalFieldTypes.head
+ if (fieldTypeInfo != at) {
+ throw new TableException(s"Result field does not match requested type. " +
+ s"Requested: $at; Actual: $fieldTypeInfo")
+ }
+
+ case _ =>
+ throw new TableException(s"Unsupported result type: $requestedTypeInfo")
+ }
+
+ // code generate MapFunction
+ val generator = new CodeGenerator(
+ config,
+ false,
+ physicalRowTypeInfo,
+ None,
+ None)
+
+ val conversion = generator.generateConverterResultExpression(
+ requestedTypeInfo,
+ logicalFieldNames)
+
+ val body =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ val genFunction = generator.generateFunction(
+ functionName,
+ classOf[MapFunction[Row, T]],
+ body,
+ requestedTypeInfo)
+
+ val mapFunction = new MapRunner[Row, T](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ Some(mapFunction)
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index f3e2f91..251be14 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
@@ -36,8 +36,10 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
import org.apache.flink.table.plan.schema.ArrayRelDataType
import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
+import org.apache.flink.types.Row
import scala.collection.mutable
+import scala.collection.JavaConverters._
/**
* Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
@@ -167,6 +169,19 @@ object FlinkTypeFactory {
throw TableException(s"Type is not supported: $t")
}
+ /**
+ * Converts a Calcite logical record into a Flink type information.
+ */
+ def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = {
+ // convert to type information
+ val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
+ FlinkTypeFactory.toTypeInfo(relDataType.getType)
+ }
+ // field names
+ val logicalFieldNames = logicalRowType.getFieldNames.asScala
+ new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
+ }
+
def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
case BOOLEAN => BOOLEAN_TYPE_INFO
case TINYINT => BYTE_TYPE_INFO
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index d49d7a0..c679bd8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -40,8 +40,8 @@ import org.apache.flink.table.codegen.calls.FunctionGenerator
import org.apache.flink.table.codegen.calls.ScalarOperators._
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.runtime.TableFunctionCollector
-import org.apache.flink.table.typeutils.TypeConverter
import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -62,8 +62,8 @@ import scala.collection.mutable
class CodeGenerator(
config: TableConfig,
nullableInput: Boolean,
- input1: TypeInformation[Any],
- input2: Option[TypeInformation[Any]] = None,
+ input1: TypeInformation[_ <: Any],
+ input2: Option[TypeInformation[_ <: Any]] = None,
input1PojoFieldMapping: Option[Array[Int]] = None,
input2PojoFieldMapping: Option[Array[Int]] = None)
extends RexVisitor[GeneratedExpression] {
@@ -112,7 +112,7 @@ class CodeGenerator(
* @param config configuration that determines runtime behavior
*/
def this(config: TableConfig) =
- this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None)
+ this(config, false, new RowTypeInfo(), None, None)
// set of member statements that will be added only once
// we use a LinkedHashSet to keep the insertion order
@@ -224,15 +224,16 @@ class CodeGenerator(
* @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
* output record can be accessed via the given term methods.
* @param returnType expected return type
- * @tparam T Flink Function to be generated.
+ * @tparam F Flink Function to be generated.
+ * @tparam T Return type of the Flink Function.
* @return instance of GeneratedFunction
*/
- def generateFunction[T <: Function](
+ def generateFunction[F <: Function, T <: Any](
name: String,
- clazz: Class[T],
+ clazz: Class[F],
bodyCode: String,
- returnType: TypeInformation[Any])
- : GeneratedFunction[T] = {
+ returnType: TypeInformation[T])
+ : GeneratedFunction[F, T] = {
val funcName = newName(name)
// Janino does not support generics, that's why we need
@@ -298,14 +299,14 @@ class CodeGenerator(
* valid Java class identifier.
* @param records code for creating records
* @param returnType expected return type
- * @tparam T Flink Function to be generated.
+ * @tparam T Return type of the Flink Function.
* @return instance of GeneratedFunction
*/
- def generateValuesInputFormat[T](
+ def generateValuesInputFormat[T <: Row](
name: String,
records: Seq[String],
- returnType: TypeInformation[Any])
- : GeneratedFunction[GenericInputFormat[T]] = {
+ returnType: TypeInformation[T])
+ : GeneratedInput[GenericInputFormat[T], T] = {
val funcName = newName(name)
addReusableOutRecord(returnType)
@@ -343,7 +344,7 @@ class CodeGenerator(
}
""".stripMargin
- GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode)
+ GeneratedInput(funcName, returnType, funcCode)
}
/**
@@ -1094,7 +1095,7 @@ class CodeGenerator(
// ----------------------------------------------------------------------------------------------
private def generateInputAccess(
- inputType: TypeInformation[Any],
+ inputType: TypeInformation[_ <: Any],
inputTerm: String,
index: Int,
pojoFieldMapping: Option[Array[Int]])
@@ -1122,7 +1123,7 @@ class CodeGenerator(
}
private def generateNullableInputFieldAccess(
- inputType: TypeInformation[Any],
+ inputType: TypeInformation[_ <: Any],
inputTerm: String,
index: Int,
pojoFieldMapping: Option[Array[Int]])
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 94007de..0f1de21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -24,11 +24,10 @@ import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.TypeConverter
import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -39,7 +38,7 @@ import scala.collection.JavaConverters._
class ExpressionReducer(config: TableConfig)
extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
- private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
+ private val EMPTY_ROW_INFO = new RowTypeInfo()
private val EMPTY_ROW = new Row(0)
override def reduce(
@@ -82,14 +81,14 @@ class ExpressionReducer(config: TableConfig)
resultType.getFieldNames,
literals)
- val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
+ val generatedFunction = generator.generateFunction[MapFunction[Row, Row], Row](
"ExpressionReducer",
classOf[MapFunction[Row, Row]],
s"""
|${result.code}
|return ${result.resultTerm};
|""".stripMargin,
- resultType.asInstanceOf[TypeInformation[Any]])
+ resultType)
val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
val function = clazz.newInstance()
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
index b4c293d..271f686 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
@@ -18,6 +18,9 @@
package org.apache.flink.table.codegen
+import org.apache.flink.api.common.functions
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
/**
@@ -41,14 +44,32 @@ object GeneratedExpression {
}
/**
- * Describes a generated [[org.apache.flink.api.common.functions.Function]]
+ * Describes a generated [[functions.Function]]
*
* @param name class name of the generated Function.
* @param returnType the type information of the result type
* @param code code of the generated Function.
+ * @tparam F type of function
* @tparam T type of function
*/
-case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)
+case class GeneratedFunction[F <: Function, T <: Any](
+ name: String,
+ returnType: TypeInformation[T],
+ code: String)
+
+/**
+ * Describes a generated [[InputFormat]].
+ *
+ * @param name class name of the generated input function.
+ * @param returnType the type information of the result type
+ * @param code code of the generated Function.
+ * @tparam F type of function
+ * @tparam T type of function
+ */
+case class GeneratedInput[F <: InputFormat[_, _], T <: Any](
+ name: String,
+ returnType: TypeInformation[T],
+ code: String)
/**
* Describes a generated [[org.apache.flink.util.Collector]].
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 3ba0285..20f810a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -22,14 +22,13 @@ import java.util
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.CorrelationId
-import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableFunctionScan}
+import org.apache.calcite.rel.core.{CorrelationId, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
import org.apache.calcite.rex.{RexInputRef, RexNode}
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.table._
import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment, UnresolvedException}
import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.expressions._
@@ -37,7 +36,6 @@ import org.apache.flink.table.functions.TableFunction
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
-import org.apache.flink.table.typeutils.TypeConverter
import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
import scala.collection.JavaConverters._
@@ -426,11 +424,18 @@ case class Join(
}
relBuilder.join(
- TypeConverter.flinkJoinTypeToRelType(joinType),
+ convertJoinType(joinType),
condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
corSet.asJava)
}
+ private def convertJoinType(joinType: JoinType) = joinType match {
+ case JoinType.INNER => JoinRelType.INNER
+ case JoinType.LEFT_OUTER => JoinRelType.LEFT
+ case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
+ case JoinType.FULL_OUTER => JoinRelType.FULL
+ }
+
private def ambiguousName: Set[String] =
left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
@@ -481,13 +486,12 @@ case class Join(
if (checkIfFilterCondition(x)) {
localPredicateFound = true
}
- case x: BinaryComparison => {
+ case x: BinaryComparison =>
if (checkIfFilterCondition(x)) {
localPredicateFound = true
} else {
nonEquiJoinPredicateFound = true
}
- }
case x => failValidation(
s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
new file mode 100644
index 0000000..3883b14
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+
+import scala.collection.JavaConverters._
+
+trait CommonAggregate {
+
+ private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = {
+
+ val inFields = inputType.getFieldNames.asScala
+ grouping.map( inFields(_) ).mkString(", ")
+ }
+
+ private[flink] def aggregationToString(
+ inputType: RelDataType,
+ grouping: Array[Int],
+ rowType: RelDataType,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ namedProperties: Seq[NamedWindowProperty])
+ : String = {
+
+ val inFields = inputType.getFieldNames.asScala
+ val outFields = rowType.getFieldNames.asScala
+
+ val groupStrings = grouping.map( inFields(_) )
+
+ val aggs = namedAggregates.map(_.getKey)
+ val aggStrings = aggs.map( a => s"${a.getAggregation}(${
+ if (a.getArgList.size() > 0) {
+ inFields(a.getArgList.get(0))
+ } else {
+ "*"
+ }
+ })")
+
+ val propStrings = namedProperties.map(_.property.toString)
+
+ (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
+ case (f, o) => if (f == o) {
+ f
+ } else {
+ s"$f AS $o"
+ }
+ }.mkString(", ")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
new file mode 100644
index 0000000..3f46258
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.runtime.FlatMapRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+trait CommonCalc {
+
+ private[flink] def functionBody(
+ generator: CodeGenerator,
+ inputType: TypeInformation[Row],
+ rowType: RelDataType,
+ calcProgram: RexProgram,
+ config: TableConfig)
+ : String = {
+
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+
+ val condition = calcProgram.getCondition
+ val expandedExpressions = calcProgram.getProjectList.map(
+ expr => calcProgram.expandLocalRef(expr))
+ val projection = generator.generateResultExpression(
+ returnType,
+ rowType.getFieldNames,
+ expandedExpressions)
+
+ // only projection
+ if (condition == null) {
+ s"""
+ |${projection.code}
+ |${generator.collectorTerm}.collect(${projection.resultTerm});
+ |""".stripMargin
+ }
+ else {
+ val filterCondition = generator.generateExpression(
+ calcProgram.expandLocalRef(calcProgram.getCondition))
+ // only filter
+ if (projection == null) {
+ s"""
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${generator.collectorTerm}.collect(${generator.input1Term});
+ |}
+ |""".stripMargin
+ }
+ // both filter and projection
+ else {
+ s"""
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${projection.code}
+ | ${generator.collectorTerm}.collect(${projection.resultTerm});
+ |}
+ |""".stripMargin
+ }
+ }
+ }
+
+ private[flink] def calcMapFunction(
+ genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row])
+ : RichFlatMapFunction[Row, Row] = {
+
+ new FlatMapRunner[Row, Row](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+ }
+
+ private[flink] def conditionToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+ val cond = calcProgram.getCondition
+ val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+ val localExprs = calcProgram.getExprList.asScala.toList
+
+ if (cond != null) {
+ expression(cond, inFields, Some(localExprs))
+ } else {
+ ""
+ }
+ }
+
+ private[flink] def selectionToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+ val proj = calcProgram.getProjectList.asScala.toList
+ val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+ val localExprs = calcProgram.getExprList.asScala.toList
+ val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
+
+ proj
+ .map(expression(_, inFields, Some(localExprs)))
+ .zip(outFields).map { case (e, o) =>
+ if (e != o) {
+ e + " AS " + o
+ } else {
+ e
+ }
+ }.mkString(", ")
+ }
+
+ private[flink] def calcOpName(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+ val conditionStr = conditionToString(calcProgram, expression)
+ val selectionStr = selectionToString(calcProgram, expression)
+
+ s"${if (calcProgram.getCondition != null) {
+ s"where: ($conditionStr), "
+ } else {
+ ""
+ }}select: ($selectionStr)"
+ }
+
+ private[flink] def calcToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+ val name = calcOpName(calcProgram, expression)
+ s"Calc($name)"
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
new file mode 100644
index 0000000..61b7ffb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+ * Join a user-defined table function
+ */
+trait CommonCorrelate {
+
+ /**
+ * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
+ * and user-defined table function.
+ */
+ private[flink] def correlateMapFunction(
+ config: TableConfig,
+ inputTypeInfo: TypeInformation[Row],
+ udtfTypeInfo: TypeInformation[Any],
+ rowType: RelDataType,
+ joinType: SemiJoinType,
+ rexCall: RexCall,
+ condition: Option[RexNode],
+ pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
+ ruleDescription: String)
+ : CorrelateFlatMapRunner[Row, Row] = {
+
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+
+ val flatMap = generateFunction(
+ config,
+ inputTypeInfo,
+ udtfTypeInfo,
+ returnType,
+ rowType,
+ joinType,
+ rexCall,
+ pojoFieldMapping,
+ ruleDescription)
+
+ val collector = generateCollector(
+ config,
+ inputTypeInfo,
+ udtfTypeInfo,
+ returnType,
+ rowType,
+ condition,
+ pojoFieldMapping)
+
+ new CorrelateFlatMapRunner[Row, Row](
+ flatMap.name,
+ flatMap.code,
+ collector.name,
+ collector.code,
+ flatMap.returnType)
+
+ }
+
+ /**
+ * Generates the flat map function to run the user-defined table function.
+ */
+ private def generateFunction(
+ config: TableConfig,
+ inputTypeInfo: TypeInformation[Row],
+ udtfTypeInfo: TypeInformation[Any],
+ returnType: TypeInformation[Row],
+ rowType: RelDataType,
+ joinType: SemiJoinType,
+ rexCall: RexCall,
+ pojoFieldMapping: Option[Array[Int]],
+ ruleDescription: String)
+ : GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+
+ val functionGenerator = new CodeGenerator(
+ config,
+ false,
+ inputTypeInfo,
+ Some(udtfTypeInfo),
+ None,
+ pojoFieldMapping)
+
+ val (input1AccessExprs, input2AccessExprs) = functionGenerator.generateCorrelateAccessExprs
+
+ val collectorTerm = functionGenerator
+ .addReusableConstructor(classOf[TableFunctionCollector[_]])
+ .head
+
+ val call = functionGenerator.generateExpression(rexCall)
+ var body =
+ s"""
+ |${call.resultTerm}.setCollector($collectorTerm);
+ |${call.code}
+ |""".stripMargin
+
+ if (joinType == SemiJoinType.LEFT) {
+ // left outer join
+
+ // in case of left outer join and the returned row of table function is empty,
+ // fill all fields of row with null
+ val input2NullExprs = input2AccessExprs.map { x =>
+ GeneratedExpression(
+ primitiveDefaultValue(x.resultType),
+ ALWAYS_NULL,
+ NO_CODE,
+ x.resultType)
+ }
+ val outerResultExpr = functionGenerator.generateResultExpression(
+ input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
+ body +=
+ s"""
+ |boolean hasOutput = $collectorTerm.isCollected();
+ |if (!hasOutput) {
+ | ${outerResultExpr.code}
+ | ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+ |}
+ |""".stripMargin
+ } else if (joinType != SemiJoinType.INNER) {
+ throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+ }
+
+ functionGenerator.generateFunction(
+ ruleDescription,
+ classOf[FlatMapFunction[Row, Row]],
+ body,
+ returnType)
+ }
+
+ /**
+ * Generates table function collector.
+ */
+ private[flink] def generateCollector(
+ config: TableConfig,
+ inputTypeInfo: TypeInformation[Row],
+ udtfTypeInfo: TypeInformation[Any],
+ returnType: TypeInformation[Row],
+ rowType: RelDataType,
+ condition: Option[RexNode],
+ pojoFieldMapping: Option[Array[Int]])
+ : GeneratedCollector = {
+
+ val generator = new CodeGenerator(
+ config,
+ false,
+ inputTypeInfo,
+ Some(udtfTypeInfo),
+ None,
+ pojoFieldMapping)
+
+ val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
+
+ val crossResultExpr = generator.generateResultExpression(
+ input1AccessExprs ++ input2AccessExprs,
+ returnType,
+ rowType.getFieldNames.asScala)
+
+ val collectorCode = if (condition.isEmpty) {
+ s"""
+ |${crossResultExpr.code}
+ |getCollector().collect(${crossResultExpr.resultTerm});
+ |""".stripMargin
+ } else {
+ val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
+ filterGenerator.input1Term = filterGenerator.input2Term
+ val filterCondition = filterGenerator.generateExpression(condition.get)
+ s"""
+ |${filterGenerator.reuseInputUnboxingCode()}
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${crossResultExpr.code}
+ | getCollector().collect(${crossResultExpr.resultTerm});
+ |}
+ |""".stripMargin
+ }
+
+ generator.generateTableFunctionCollector(
+ "TableFunctionCollector",
+ collectorCode,
+ udtfTypeInfo)
+ }
+
+ private[flink] def selectToString(rowType: RelDataType): String = {
+ rowType.getFieldNames.asScala.mkString(",")
+ }
+
+ private[flink] def correlateOpName(
+ rexCall: RexCall,
+ sqlFunction: TableSqlFunction,
+ rowType: RelDataType)
+ : String = {
+
+ s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
+ }
+
+ private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
+ val udtfName = sqlFunction.getName
+ val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
+ s"table($udtfName($operands))"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
new file mode 100644
index 0000000..274b602
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.types.Row
+
+/**
+ * Common class for batch and stream scans.
+ */
+trait CommonScan {
+
+ /**
+ * We check if the input type is exactly the same as the internal row type.
+ * A conversion is necessary if types differ.
+ */
+ private[flink] def needsConversion(
+ externalTypeInfo: TypeInformation[Any],
+ internalTypeInfo: TypeInformation[Row])
+ : Boolean = {
+
+ externalTypeInfo != internalTypeInfo
+ }
+
+ private[flink] def getConversionMapper(
+ config: TableConfig,
+ inputType: TypeInformation[Any],
+ expectedType: TypeInformation[Row],
+ conversionOperatorName: String,
+ fieldNames: Seq[String],
+ inputPojoFieldMapping: Option[Array[Int]] = None)
+ : MapFunction[Any, Row] = {
+
+ val generator = new CodeGenerator(
+ config,
+ false,
+ inputType,
+ None,
+ inputPojoFieldMapping)
+ val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
+
+ val body =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ val genFunction = generator.generateFunction(
+ conversionOperatorName,
+ classOf[MapFunction[Any, Row]],
+ body,
+ expectedType)
+
+ new MapRunner[Any, Row](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
deleted file mode 100644
index 7290594..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.flink.table.calcite.FlinkRelBuilder
-import FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.runtime.aggregate.AggregateUtil._
-
-import scala.collection.JavaConverters._
-
-trait FlinkAggregate {
-
- private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = {
-
- val inFields = inputType.getFieldNames.asScala
- grouping.map( inFields(_) ).mkString(", ")
- }
-
- private[flink] def aggregationToString(
- inputType: RelDataType,
- grouping: Array[Int],
- rowType: RelDataType,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- namedProperties: Seq[NamedWindowProperty])
- : String = {
-
- val inFields = inputType.getFieldNames.asScala
- val outFields = rowType.getFieldNames.asScala
-
- val groupStrings = grouping.map( inFields(_) )
-
- val aggs = namedAggregates.map(_.getKey)
- val aggStrings = aggs.map( a => s"${a.getAggregation}(${
- if (a.getArgList.size() > 0) {
- inFields(a.getArgList.get(0))
- } else {
- "*"
- }
- })")
-
- val propStrings = namedProperties.map(_.property.toString)
-
- (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
- case (f, o) => if (f == o) {
- f
- } else {
- s"$f AS $o"
- }
- }.mkString(", ")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
deleted file mode 100644
index 5ebd3ee..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexNode, RexProgram}
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.table.runtime.FlatMapRunner
-import org.apache.flink.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-trait FlinkCalc {
-
- private[flink] def functionBody(
- generator: CodeGenerator,
- inputType: TypeInformation[Any],
- rowType: RelDataType,
- calcProgram: RexProgram,
- config: TableConfig,
- expectedType: Option[TypeInformation[Any]]): String = {
-
- val returnType = determineReturnType(
- rowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val condition = calcProgram.getCondition
- val expandedExpressions = calcProgram.getProjectList.map(
- expr => calcProgram.expandLocalRef(expr))
- val projection = generator.generateResultExpression(
- returnType,
- rowType.getFieldNames,
- expandedExpressions)
-
- // only projection
- if (condition == null) {
- s"""
- |${projection.code}
- |${generator.collectorTerm}.collect(${projection.resultTerm});
- |""".stripMargin
- }
- else {
- val filterCondition = generator.generateExpression(
- calcProgram.expandLocalRef(calcProgram.getCondition))
- // only filter
- if (projection == null) {
- // conversion
- if (inputType != returnType) {
- val conversion = generator.generateConverterResultExpression(
- returnType,
- rowType.getFieldNames)
-
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${conversion.code}
- | ${generator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
- }
- // no conversion
- else {
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${generator.collectorTerm}.collect(${generator.input1Term});
- |}
- |""".stripMargin
- }
- }
- // both filter and projection
- else {
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${projection.code}
- | ${generator.collectorTerm}.collect(${projection.resultTerm});
- |}
- |""".stripMargin
- }
- }
- }
-
- private[flink] def calcMapFunction(
- genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): RichFlatMapFunction[Any, Any] = {
-
- new FlatMapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
- }
-
- private[flink] def conditionToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-
- val cond = calcProgram.getCondition
- val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
- val localExprs = calcProgram.getExprList.asScala.toList
-
- if (cond != null) {
- expression(cond, inFields, Some(localExprs))
- } else {
- ""
- }
- }
-
- private[flink] def selectionToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-
- val proj = calcProgram.getProjectList.asScala.toList
- val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
- val localExprs = calcProgram.getExprList.asScala.toList
- val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
-
- proj
- .map(expression(_, inFields, Some(localExprs)))
- .zip(outFields).map { case (e, o) => {
- if (e != o) {
- e + " AS " + o
- } else {
- e
- }
- }
- }.mkString(", ")
- }
-
- private[flink] def calcOpName(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
- val conditionStr = conditionToString(calcProgram, expression)
- val selectionStr = selectionToString(calcProgram, expression)
-
- s"${if (calcProgram.getCondition != null) {
- s"where: ($conditionStr), "
- } else {
- ""
- }}select: ($selectionStr)"
- }
-
- private[flink] def calcToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
- val name = calcOpName(calcProgram, expression)
- s"Calc($name)"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
deleted file mode 100644
index c986602..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
-import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
-import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
-import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
-import org.apache.flink.table.typeutils.TypeConverter._
-import org.apache.flink.table.api.{TableConfig, TableException}
-
-import scala.collection.JavaConverters._
-
-/**
- * Join a user-defined table function
- */
-trait FlinkCorrelate {
-
- /**
- * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
- * and user-defined table function.
- */
- private[flink] def correlateMapFunction(
- config: TableConfig,
- inputTypeInfo: TypeInformation[Any],
- udtfTypeInfo: TypeInformation[Any],
- rowType: RelDataType,
- joinType: SemiJoinType,
- rexCall: RexCall,
- condition: Option[RexNode],
- expectedType: Option[TypeInformation[Any]],
- pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
- ruleDescription: String)
- : CorrelateFlatMapRunner[Any, Any] = {
-
- val returnType = determineReturnType(
- rowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val flatMap = generateFunction(
- config,
- inputTypeInfo,
- udtfTypeInfo,
- returnType,
- rowType,
- joinType,
- rexCall,
- pojoFieldMapping,
- ruleDescription)
-
- val collector = generateCollector(
- config,
- inputTypeInfo,
- udtfTypeInfo,
- returnType,
- rowType,
- condition,
- pojoFieldMapping)
-
- new CorrelateFlatMapRunner[Any, Any](
- flatMap.name,
- flatMap.code,
- collector.name,
- collector.code,
- flatMap.returnType)
-
- }
-
- /**
- * Generates the flat map function to run the user-defined table function.
- */
- private def generateFunction(
- config: TableConfig,
- inputTypeInfo: TypeInformation[Any],
- udtfTypeInfo: TypeInformation[Any],
- returnType: TypeInformation[Any],
- rowType: RelDataType,
- joinType: SemiJoinType,
- rexCall: RexCall,
- pojoFieldMapping: Option[Array[Int]],
- ruleDescription: String)
- : GeneratedFunction[FlatMapFunction[Any, Any]] = {
-
- val functionGenerator = new CodeGenerator(
- config,
- false,
- inputTypeInfo,
- Some(udtfTypeInfo),
- None,
- pojoFieldMapping)
-
- val (input1AccessExprs, input2AccessExprs) = functionGenerator.generateCorrelateAccessExprs
-
- val collectorTerm = functionGenerator
- .addReusableConstructor(classOf[TableFunctionCollector[_]])
- .head
-
- val call = functionGenerator.generateExpression(rexCall)
- var body =
- s"""
- |${call.resultTerm}.setCollector($collectorTerm);
- |${call.code}
- |""".stripMargin
-
- if (joinType == SemiJoinType.LEFT) {
- // left outer join
-
- // in case of left outer join and the returned row of table function is empty,
- // fill all fields of row with null
- val input2NullExprs = input2AccessExprs.map { x =>
- GeneratedExpression(
- primitiveDefaultValue(x.resultType),
- ALWAYS_NULL,
- NO_CODE,
- x.resultType)
- }
- val outerResultExpr = functionGenerator.generateResultExpression(
- input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
- body +=
- s"""
- |boolean hasOutput = $collectorTerm.isCollected();
- |if (!hasOutput) {
- | ${outerResultExpr.code}
- | ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
- |}
- |""".stripMargin
- } else if (joinType != SemiJoinType.INNER) {
- throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
- }
-
- functionGenerator.generateFunction(
- ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
- body,
- returnType)
- }
-
- /**
- * Generates table function collector.
- */
- private[flink] def generateCollector(
- config: TableConfig,
- inputTypeInfo: TypeInformation[Any],
- udtfTypeInfo: TypeInformation[Any],
- returnType: TypeInformation[Any],
- rowType: RelDataType,
- condition: Option[RexNode],
- pojoFieldMapping: Option[Array[Int]])
- : GeneratedCollector = {
-
- val generator = new CodeGenerator(
- config,
- false,
- inputTypeInfo,
- Some(udtfTypeInfo),
- None,
- pojoFieldMapping)
-
- val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
-
- val crossResultExpr = generator.generateResultExpression(
- input1AccessExprs ++ input2AccessExprs,
- returnType,
- rowType.getFieldNames.asScala)
-
- val collectorCode = if (condition.isEmpty) {
- s"""
- |${crossResultExpr.code}
- |getCollector().collect(${crossResultExpr.resultTerm});
- |""".stripMargin
- } else {
- val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
- filterGenerator.input1Term = filterGenerator.input2Term
- val filterCondition = filterGenerator.generateExpression(condition.get)
- s"""
- |${filterGenerator.reuseInputUnboxingCode()}
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${crossResultExpr.code}
- | getCollector().collect(${crossResultExpr.resultTerm});
- |}
- |""".stripMargin
- }
-
- generator.generateTableFunctionCollector(
- "TableFunctionCollector",
- collectorCode,
- udtfTypeInfo)
- }
-
- private[flink] def selectToString(rowType: RelDataType): String = {
- rowType.getFieldNames.asScala.mkString(",")
- }
-
- private[flink] def correlateOpName(
- rexCall: RexCall,
- sqlFunction: TableSqlFunction,
- rowType: RelDataType)
- : String = {
-
- s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
- }
-
- private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
- val udtfName = sqlFunction.getName
- val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
- s"table($udtfName($operands))"
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index a7765d1..7ad9bd5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -66,43 +66,6 @@ trait FlinkRel {
}
}
- private[flink] def getConversionMapper(
- config: TableConfig,
- nullableInput: Boolean,
- inputType: TypeInformation[Any],
- expectedType: TypeInformation[Any],
- conversionOperatorName: String,
- fieldNames: Seq[String],
- inputPojoFieldMapping: Option[Array[Int]] = None)
- : MapFunction[Any, Any] = {
-
- val generator = new CodeGenerator(
- config,
- nullableInput,
- inputType,
- None,
- inputPojoFieldMapping)
- val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
-
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
- val genFunction = generator.generateFunction(
- conversionOperatorName,
- classOf[MapFunction[Any, Any]],
- body,
- expectedType)
-
- new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- }
-
private[flink] def estimateRowSize(rowType: RelDataType): Double = {
val fieldList = rowType.getFieldList
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index 252bb2e..09262a6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.plan._
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonScan
import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -36,6 +36,7 @@ abstract class BatchScan(
traitSet: RelTraitSet,
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
+ with CommonScan
with DataSetRel {
override def toString: String = {
@@ -48,50 +49,34 @@ abstract class BatchScan(
planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
}
- protected def convertToExpectedType(
+ protected def convertToInternalRow(
input: DataSet[Any],
flinkTable: FlinkTable[_],
- expectedType: Option[TypeInformation[Any]],
- config: TableConfig): DataSet[Any] = {
+ config: TableConfig)
+ : DataSet[Row] = {
val inputType = input.getType
- expectedType match {
+ val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
- // special case:
- // if efficient type usage is enabled and no expected type is set
- // we can simply forward the DataSet to the next operator.
- // however, we cannot forward PojoTypes as their fields don't have an order
- case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
- input
+ // conversion
+ if (needsConversion(inputType, internalType)) {
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val mapFunc = getConversionMapper(
+ config,
+ inputType,
+ internalType,
+ "DataSetSourceConversion",
+ getRowType.getFieldNames,
+ Some(flinkTable.fieldIndexes))
- // conversion
- if (determinedType != inputType) {
+ val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- val mapFunc = getConversionMapper(
- config,
- nullableInput = false,
- inputType,
- determinedType,
- "DataSetSourceConversion",
- getRowType.getFieldNames,
- Some(flinkTable.fieldIndexes))
-
- val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- input.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- input
- }
+ input.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ input.asInstanceOf[DataSet[Row]]
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 73dddc6..9b8e1ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.types.Row
/** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
class BatchTableSourceScan(
@@ -62,13 +63,11 @@ class BatchTableSourceScan(
.item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
- convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
+ convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index 6771536..206e562 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -23,19 +23,15 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.runtime.aggregate.AggregateUtil
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.table.typeutils.TypeConverter
-import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.types.Row
-import scala.collection.JavaConverters._
-
/**
* Flink RelNode which matches along with a LogicalAggregate.
*/
@@ -49,7 +45,7 @@ class DataSetAggregate(
grouping: Array[Int],
inGroupingSet: Boolean)
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkAggregate
+ with CommonAggregate
with DataSetRel {
override def deriveRowType(): RelDataType = rowRelDataType
@@ -89,9 +85,7 @@ class DataSetAggregate(
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
@@ -109,15 +103,7 @@ class DataSetAggregate(
grouping,
inGroupingSet)
- val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
- tableEnv,
- // tell the input operator that this operator currently only supports Rows as input
- Some(TypeConverter.DEFAULT_ROW_TYPE))
-
- // get the output types
- val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
- .toArray
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
val prepareOpName = s"prepare select: ($aggString)"
@@ -125,46 +111,26 @@ class DataSetAggregate(
.map(mapFunction)
.name(prepareOpName)
- val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
-
- val result = {
- if (groupingKeys.length > 0) {
- // grouped aggregation
- val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
- s"select: ($aggString)"
-
- mappedInput.asInstanceOf[DataSet[Row]]
- .groupBy(groupingKeys: _*)
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
- .name(aggOpName)
- .asInstanceOf[DataSet[Any]]
- }
- else {
- // global aggregation
- val aggOpName = s"select:($aggString)"
- mappedInput.asInstanceOf[DataSet[Row]]
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
- .name(aggOpName)
- .asInstanceOf[DataSet[Any]]
- }
- }
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+ if (groupingKeys.length > 0) {
+ // grouped aggregation
+ val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+ s"select: ($aggString)"
- // if the expected type is not a Row, inject a mapper to convert to the expected type
- expectedType match {
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- result.map(getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType = expectedType.get,
- conversionOperatorName = "DataSetAggregateConversion",
- fieldNames = getRowType.getFieldNames.asScala
- ))
- .name(mapName)
- case _ => result
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .groupBy(groupingKeys: _*)
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ }
+ else {
+ // global aggregation
+ val aggOpName = s"select:($aggString)"
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
}
}
}