You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/02/13 16:51:24 UTC
[3/3] flink git commit: [FLINK-5662] [table] Rework internal type
handling of Table API
[FLINK-5662] [table] Rework internal type handling of Table API
This closes #3271.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6b225
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6b225
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6b225
Branch: refs/heads/master
Commit: 6bc6b225e55095eb8797db2903b0546410e7fdd9
Parents: 1ce10c8
Author: twalthr <tw...@apache.org>
Authored: Mon Feb 6 17:18:08 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Mon Feb 13 17:50:00 2017 +0100
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 34 ++-
.../table/api/StreamTableEnvironment.scala | 41 ++--
.../apache/flink/table/api/TableConfig.scala | 24 --
.../flink/table/api/TableEnvironment.scala | 130 +++++++++-
.../flink/table/calcite/FlinkTypeFactory.scala | 17 +-
.../flink/table/codegen/CodeGenerator.scala | 33 +--
.../flink/table/codegen/ExpressionReducer.scala | 11 +-
.../apache/flink/table/codegen/generated.scala | 25 +-
.../flink/table/plan/logical/operators.scala | 18 +-
.../table/plan/nodes/CommonAggregate.scala | 69 ++++++
.../flink/table/plan/nodes/CommonCalc.scala | 152 ++++++++++++
.../table/plan/nodes/CommonCorrelate.scala | 229 ++++++++++++++++++
.../flink/table/plan/nodes/CommonScan.scala | 82 +++++++
.../flink/table/plan/nodes/FlinkAggregate.scala | 69 ------
.../flink/table/plan/nodes/FlinkCalc.scala | 172 -------------
.../flink/table/plan/nodes/FlinkCorrelate.scala | 233 ------------------
.../flink/table/plan/nodes/FlinkRel.scala | 37 ---
.../table/plan/nodes/dataset/BatchScan.scala | 61 ++---
.../nodes/dataset/BatchTableSourceScan.scala | 7 +-
.../plan/nodes/dataset/DataSetAggregate.scala | 82 ++-----
.../table/plan/nodes/dataset/DataSetCalc.scala | 27 +--
.../plan/nodes/dataset/DataSetCorrelate.scala | 12 +-
.../plan/nodes/dataset/DataSetIntersect.scala | 57 +----
.../table/plan/nodes/dataset/DataSetJoin.scala | 32 ++-
.../table/plan/nodes/dataset/DataSetMinus.scala | 57 +----
.../table/plan/nodes/dataset/DataSetRel.scala | 17 +-
.../table/plan/nodes/dataset/DataSetScan.scala | 8 +-
.../nodes/dataset/DataSetSingleRowJoin.scala | 48 ++--
.../table/plan/nodes/dataset/DataSetSort.scala | 49 +---
.../table/plan/nodes/dataset/DataSetUnion.scala | 24 +-
.../plan/nodes/dataset/DataSetValues.scala | 18 +-
.../nodes/dataset/DataSetWindowAggregate.scala | 68 ++----
.../nodes/datastream/DataStreamAggregate.scala | 239 ++++++++-----------
.../plan/nodes/datastream/DataStreamCalc.scala | 27 +--
.../nodes/datastream/DataStreamCorrelate.scala | 14 +-
.../plan/nodes/datastream/DataStreamRel.scala | 15 +-
.../plan/nodes/datastream/DataStreamScan.scala | 10 +-
.../plan/nodes/datastream/DataStreamUnion.scala | 6 +-
.../nodes/datastream/DataStreamValues.scala | 23 +-
.../plan/nodes/datastream/StreamScan.scala | 84 ++-----
.../datastream/StreamTableSourceScan.scala | 12 +-
.../table/runtime/aggregate/AggregateUtil.scala | 120 +++++-----
.../flink/table/typeutils/TypeConverter.scala | 156 ------------
.../api/java/batch/TableEnvironmentITCase.java | 7 +-
.../scala/batch/TableEnvironmentITCase.scala | 4 +-
.../batch/utils/TableProgramsTestBase.scala | 11 +-
.../expressions/utils/ExpressionTestBase.scala | 16 +-
47 files changed, 1184 insertions(+), 1503 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index dd0487a..2dec00e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -23,11 +23,12 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{Programs, RuleSet}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
@@ -135,7 +136,7 @@ abstract class BatchTableEnvironment(
private[flink] def explain(table: Table, extended: Boolean): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
- val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row]))
+ val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new GenericTypeInfo(classOf[Row]))
dataSet.output(new DiscardingOutputFormat[Row])
val env = dataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
@@ -250,28 +251,39 @@ abstract class BatchTableEnvironment(
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
- val dataSetPlan = optimize(table.getRelNode)
- translate(dataSetPlan)
+ val relNode = table.getRelNode
+ val dataSetPlan = optimize(relNode)
+ translate(dataSetPlan, relNode.getRowType)
}
/**
- * Translates a logical [[RelNode]] into a [[DataSet]].
+ * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary.
*
* @param logicalPlan The root node of the relational expression tree.
+ * @param logicalType The row type of the result. Since the logicalPlan can lose the
+ * field naming during optimization we pass the row type separately.
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
- protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
+ protected def translate[A](
+ logicalPlan: RelNode,
+ logicalType: RelDataType)
+ (implicit tpe: TypeInformation[A]): DataSet[A] = {
TableEnvironment.validateType(tpe)
logicalPlan match {
case node: DataSetRel =>
- node.translateToPlan(
- this,
- Some(tpe.asInstanceOf[TypeInformation[Any]])
- ).asInstanceOf[DataSet[A]]
- case _ => ???
+ val plan = node.translateToPlan(this)
+ val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion")
+ conversion match {
+ case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
+ case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+ }
+
+ case _ =>
+ throw TableException("Cannot generate DataSet due to an invalid logical plan. " +
+ "This is a bug and should not happen. Please file an issue.")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 81e884d..19c4af1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -23,10 +23,11 @@ import _root_.java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{Programs, RuleSet}
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.explain.PlanJsonParser
@@ -200,11 +201,11 @@ abstract class StreamTableEnvironment(
dataStream: DataStream[T],
fields: Array[Expression]): Unit = {
- val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray)
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields)
val dataStreamTable = new DataStreamTable[T](
dataStream,
- fieldIndexes.toArray,
- fieldNames.toArray
+ fieldIndexes,
+ fieldNames
)
registerTableInternal(name, dataStreamTable)
}
@@ -255,30 +256,40 @@ abstract class StreamTableEnvironment(
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
- val dataStreamPlan = optimize(table.getRelNode)
- translate(dataStreamPlan)
+ val relNode = table.getRelNode
+ val dataStreamPlan = optimize(relNode)
+ translate(dataStreamPlan, relNode.getRowType)
}
/**
* Translates a logical [[RelNode]] into a [[DataStream]].
*
* @param logicalPlan The root node of the relational expression tree.
+ * @param logicalType The row type of the result. Since the logicalPlan can lose the
+ * field naming during optimization we pass the row type separately.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
- protected def translate[A]
- (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {
+ protected def translate[A](
+ logicalPlan: RelNode,
+ logicalType: RelDataType)
+ (implicit tpe: TypeInformation[A]): DataStream[A] = {
TableEnvironment.validateType(tpe)
logicalPlan match {
case node: DataStreamRel =>
- node.translateToPlan(
- this,
- Some(tpe.asInstanceOf[TypeInformation[Any]])
- ).asInstanceOf[DataStream[A]]
- case _ => ???
+ val plan = node.translateToPlan(this)
+ val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
+ conversion match {
+ case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary
+ case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+ }
+
+ case _ =>
+ throw TableException("Cannot generate DataStream due to an invalid logical plan. " +
+ "This is a bug and should not happen. Please file an issue.")
}
}
@@ -291,7 +302,9 @@ abstract class StreamTableEnvironment(
def explain(table: Table): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
- val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
+ val dataStream = translate[Row](
+ optimizedPlan,
+ ast.getRowType)(new GenericTypeInfo(classOf[Row]))
val env = dataStream.getExecutionEnvironment
val jsonSqlPlan = env.getExecutionPlan
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
index a8876a8..6448657 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala
@@ -37,12 +37,6 @@ class TableConfig {
private var nullCheck: Boolean = true
/**
- * Defines if efficient types (such as Tuple types or Atomic types)
- * should be used within operators where possible.
- */
- private var efficientTypeUsage = false
-
- /**
* Defines the configuration of Calcite for Table API and SQL queries.
*/
private var calciteConfig = CalciteConfig.DEFAULT
@@ -73,24 +67,6 @@ class TableConfig {
}
/**
- * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types
- * or Atomic types) are used within operators where possible.
- *
- * NOTE: Currently, this is an experimental feature.
- */
- def getEfficientTypeUsage = efficientTypeUsage
-
- /**
- * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types
- * or Atomic types) are used within operators where possible.
- *
- * NOTE: Currently, this is an experimental feature.
- */
- def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = {
- this.efficientTypeUsage = efficientTypeUsage
- }
-
- /**
* Returns the current configuration of Calcite for Table API and SQL queries.
*/
def getCalciteConfig: CalciteConfig = calciteConfig
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bcff1fb..b36441a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -31,27 +31,30 @@ import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.parser.SqlParser
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets}
+import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
-import java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
+import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
-import org.apache.flink.table.codegen.ExpressionReducer
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
-import org.apache.flink.table.plan.schema.{RelTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.RelTable
+import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
@@ -410,7 +413,7 @@ abstract class TableEnvironment(val config: TableConfig) {
}
exprs.map {
case UnresolvedFieldReference(name) => (0, name)
- case _ => throw new TableException("Field reference expression expected.")
+ case _ => throw new TableException("Field reference expression requested.")
}
case t: TupleTypeInfo[A] =>
exprs.zipWithIndex.map {
@@ -466,6 +469,123 @@ abstract class TableEnvironment(val config: TableConfig) {
(fieldNames.toArray, fieldIndexes.toArray)
}
+ /**
+ * Creates a final converter that maps the internal row type to external type.
+ *
+ * @param physicalRowTypeInfo the input of the sink
+ * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+ * @param requestedTypeInfo the output type of the sink
+ * @param functionName name of the map function. Must not be unique but has to be a
+ * valid Java class identifier.
+ */
+ protected def sinkConversion[T](
+ physicalRowTypeInfo: TypeInformation[Row],
+ logicalRowType: RelDataType,
+ requestedTypeInfo: TypeInformation[T],
+ functionName: String)
+ : Option[MapFunction[Row, T]] = {
+
+ // validate that at least the field types of physical and logical type match
+ // we do that here to make sure that plan translation was correct
+ val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
+ if (physicalRowTypeInfo != logicalRowTypeInfo) {
+ throw TableException("The field types of physical and logical row types do not match." +
+ "This is a bug and should not happen. Please file an issue.")
+ }
+
+ // requested type is a generic Row, no conversion needed
+ if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
+ requestedTypeInfo.getTypeClass == classOf[Row]) {
+ return None
+ }
+
+ // convert to type information
+ val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
+ FlinkTypeFactory.toTypeInfo(relDataType.getType)
+ }
+ // field names
+ val logicalFieldNames = logicalRowType.getFieldNames.asScala
+
+ // validate requested type
+ if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
+ throw new TableException("Arity of result does not match requested type.")
+ }
+ requestedTypeInfo match {
+
+ // POJO type requested
+ case pt: PojoTypeInfo[_] =>
+ logicalFieldNames.zip(logicalFieldTypes) foreach {
+ case (fName, fType) =>
+ val pojoIdx = pt.getFieldIndex(fName)
+ if (pojoIdx < 0) {
+ throw new TableException(s"POJO does not define field name: $fName")
+ }
+ val requestedTypeInfo = pt.getTypeAt(pojoIdx)
+ if (fType != requestedTypeInfo) {
+ throw new TableException(s"Result field does not match requested type. " +
+ s"requested: $requestedTypeInfo; Actual: $fType")
+ }
+ }
+
+ // Tuple/Case class/Row type requested
+ case tt: TupleTypeInfoBase[_] =>
+ logicalFieldTypes.zipWithIndex foreach {
+ case (fieldTypeInfo, i) =>
+ val requestedTypeInfo = tt.getTypeAt(i)
+ if (fieldTypeInfo != requestedTypeInfo) {
+ throw new TableException(s"Result field does not match requested type. " +
+ s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo")
+ }
+ }
+
+ // Atomic type requested
+ case at: AtomicType[_] =>
+ if (logicalFieldTypes.size != 1) {
+ throw new TableException(s"Requested result type is an atomic type but " +
+ s"result has more or less than a single field.")
+ }
+ val fieldTypeInfo = logicalFieldTypes.head
+ if (fieldTypeInfo != at) {
+ throw new TableException(s"Result field does not match requested type. " +
+ s"Requested: $at; Actual: $fieldTypeInfo")
+ }
+
+ case _ =>
+ throw new TableException(s"Unsupported result type: $requestedTypeInfo")
+ }
+
+ // code generate MapFunction
+ val generator = new CodeGenerator(
+ config,
+ false,
+ physicalRowTypeInfo,
+ None,
+ None)
+
+ val conversion = generator.generateConverterResultExpression(
+ requestedTypeInfo,
+ logicalFieldNames)
+
+ val body =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ val genFunction = generator.generateFunction(
+ functionName,
+ classOf[MapFunction[Row, T]],
+ body,
+ requestedTypeInfo)
+
+ val mapFunction = new MapRunner[Row, T](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ Some(mapFunction)
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index f3e2f91..251be14 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -28,7 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo}
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType}
@@ -36,8 +36,10 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple
import org.apache.flink.table.plan.schema.ArrayRelDataType
import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName
+import org.apache.flink.types.Row
import scala.collection.mutable
+import scala.collection.JavaConverters._
/**
* Flink specific type factory that represents the interface between Flink's [[TypeInformation]]
@@ -167,6 +169,19 @@ object FlinkTypeFactory {
throw TableException(s"Type is not supported: $t")
}
+ /**
+ * Converts a Calcite logical record into a Flink type information.
+ */
+ def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = {
+ // convert to type information
+ val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
+ FlinkTypeFactory.toTypeInfo(relDataType.getType)
+ }
+ // field names
+ val logicalFieldNames = logicalRowType.getFieldNames.asScala
+ new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray)
+ }
+
def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match {
case BOOLEAN => BOOLEAN_TYPE_INFO
case TINYINT => BYTE_TYPE_INFO
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index d49d7a0..c679bd8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -40,8 +40,8 @@ import org.apache.flink.table.codegen.calls.FunctionGenerator
import org.apache.flink.table.codegen.calls.ScalarOperators._
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.runtime.TableFunctionCollector
-import org.apache.flink.table.typeutils.TypeConverter
import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -62,8 +62,8 @@ import scala.collection.mutable
class CodeGenerator(
config: TableConfig,
nullableInput: Boolean,
- input1: TypeInformation[Any],
- input2: Option[TypeInformation[Any]] = None,
+ input1: TypeInformation[_ <: Any],
+ input2: Option[TypeInformation[_ <: Any]] = None,
input1PojoFieldMapping: Option[Array[Int]] = None,
input2PojoFieldMapping: Option[Array[Int]] = None)
extends RexVisitor[GeneratedExpression] {
@@ -112,7 +112,7 @@ class CodeGenerator(
* @param config configuration that determines runtime behavior
*/
def this(config: TableConfig) =
- this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None)
+ this(config, false, new RowTypeInfo(), None, None)
// set of member statements that will be added only once
// we use a LinkedHashSet to keep the insertion order
@@ -224,15 +224,16 @@ class CodeGenerator(
* @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or
* output record can be accessed via the given term methods.
* @param returnType expected return type
- * @tparam T Flink Function to be generated.
+ * @tparam F Flink Function to be generated.
+ * @tparam T Return type of the Flink Function.
* @return instance of GeneratedFunction
*/
- def generateFunction[T <: Function](
+ def generateFunction[F <: Function, T <: Any](
name: String,
- clazz: Class[T],
+ clazz: Class[F],
bodyCode: String,
- returnType: TypeInformation[Any])
- : GeneratedFunction[T] = {
+ returnType: TypeInformation[T])
+ : GeneratedFunction[F, T] = {
val funcName = newName(name)
// Janino does not support generics, that's why we need
@@ -298,14 +299,14 @@ class CodeGenerator(
* valid Java class identifier.
* @param records code for creating records
* @param returnType expected return type
- * @tparam T Flink Function to be generated.
+ * @tparam T Return type of the Flink Function.
* @return instance of GeneratedFunction
*/
- def generateValuesInputFormat[T](
+ def generateValuesInputFormat[T <: Row](
name: String,
records: Seq[String],
- returnType: TypeInformation[Any])
- : GeneratedFunction[GenericInputFormat[T]] = {
+ returnType: TypeInformation[T])
+ : GeneratedInput[GenericInputFormat[T], T] = {
val funcName = newName(name)
addReusableOutRecord(returnType)
@@ -343,7 +344,7 @@ class CodeGenerator(
}
""".stripMargin
- GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode)
+ GeneratedInput(funcName, returnType, funcCode)
}
/**
@@ -1094,7 +1095,7 @@ class CodeGenerator(
// ----------------------------------------------------------------------------------------------
private def generateInputAccess(
- inputType: TypeInformation[Any],
+ inputType: TypeInformation[_ <: Any],
inputTerm: String,
index: Int,
pojoFieldMapping: Option[Array[Int]])
@@ -1122,7 +1123,7 @@ class CodeGenerator(
}
private def generateNullableInputFieldAccess(
- inputType: TypeInformation[Any],
+ inputType: TypeInformation[_ <: Any],
inputTerm: String,
index: Int,
pojoFieldMapping: Option[Array[Int]])
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 94007de..0f1de21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -24,11 +24,10 @@ import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rex.{RexBuilder, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.typeutils.TypeConverter
import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -39,7 +38,7 @@ import scala.collection.JavaConverters._
class ExpressionReducer(config: TableConfig)
extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] {
- private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE
+ private val EMPTY_ROW_INFO = new RowTypeInfo()
private val EMPTY_ROW = new Row(0)
override def reduce(
@@ -82,14 +81,14 @@ class ExpressionReducer(config: TableConfig)
resultType.getFieldNames,
literals)
- val generatedFunction = generator.generateFunction[MapFunction[Row, Row]](
+ val generatedFunction = generator.generateFunction[MapFunction[Row, Row], Row](
"ExpressionReducer",
classOf[MapFunction[Row, Row]],
s"""
|${result.code}
|return ${result.resultTerm};
|""".stripMargin,
- resultType.asInstanceOf[TypeInformation[Any]])
+ resultType)
val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code)
val function = clazz.newInstance()
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
index b4c293d..271f686 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
@@ -18,6 +18,9 @@
package org.apache.flink.table.codegen
+import org.apache.flink.api.common.functions
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.io.InputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
/**
@@ -41,14 +44,32 @@ object GeneratedExpression {
}
/**
- * Describes a generated [[org.apache.flink.api.common.functions.Function]]
+ * Describes a generated [[functions.Function]]
*
* @param name class name of the generated Function.
* @param returnType the type information of the result type
* @param code code of the generated Function.
+ * @tparam F type of function
* @tparam T type of function
*/
-case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)
+case class GeneratedFunction[F <: Function, T <: Any](
+ name: String,
+ returnType: TypeInformation[T],
+ code: String)
+
+/**
+ * Describes a generated [[InputFormat]].
+ *
+ * @param name class name of the generated input function.
+ * @param returnType the type information of the result type
+ * @param code code of the generated Function.
+ * @tparam F type of function
+ * @tparam T type of function
+ */
+case class GeneratedInput[F <: InputFormat[_, _], T <: Any](
+ name: String,
+ returnType: TypeInformation[T],
+ code: String)
/**
* Describes a generated [[org.apache.flink.util.Collector]].
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 3ba0285..20f810a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -22,14 +22,13 @@ import java.util
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.CorrelationId
-import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableFunctionScan}
+import org.apache.calcite.rel.core.{CorrelationId, JoinRelType}
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
import org.apache.calcite.rex.{RexInputRef, RexNode}
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.table._
import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment, UnresolvedException}
import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.expressions._
@@ -37,7 +36,6 @@ import org.apache.flink.table.functions.TableFunction
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
-import org.apache.flink.table.typeutils.TypeConverter
import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
import scala.collection.JavaConverters._
@@ -426,11 +424,18 @@ case class Join(
}
relBuilder.join(
- TypeConverter.flinkJoinTypeToRelType(joinType),
+ convertJoinType(joinType),
condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
corSet.asJava)
}
+ private def convertJoinType(joinType: JoinType) = joinType match {
+ case JoinType.INNER => JoinRelType.INNER
+ case JoinType.LEFT_OUTER => JoinRelType.LEFT
+ case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
+ case JoinType.FULL_OUTER => JoinRelType.FULL
+ }
+
private def ambiguousName: Set[String] =
left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
@@ -481,13 +486,12 @@ case class Join(
if (checkIfFilterCondition(x)) {
localPredicateFound = true
}
- case x: BinaryComparison => {
+ case x: BinaryComparison =>
if (checkIfFilterCondition(x)) {
localPredicateFound = true
} else {
nonEquiJoinPredicateFound = true
}
- }
case x => failValidation(
s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
new file mode 100644
index 0000000..3883b14
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.runtime.aggregate.AggregateUtil._
+
+import scala.collection.JavaConverters._
+
+trait CommonAggregate {
+
+ private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = {
+
+ val inFields = inputType.getFieldNames.asScala
+ grouping.map( inFields(_) ).mkString(", ")
+ }
+
+ private[flink] def aggregationToString(
+ inputType: RelDataType,
+ grouping: Array[Int],
+ rowType: RelDataType,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ namedProperties: Seq[NamedWindowProperty])
+ : String = {
+
+ val inFields = inputType.getFieldNames.asScala
+ val outFields = rowType.getFieldNames.asScala
+
+ val groupStrings = grouping.map( inFields(_) )
+
+ val aggs = namedAggregates.map(_.getKey)
+ val aggStrings = aggs.map( a => s"${a.getAggregation}(${
+ if (a.getArgList.size() > 0) {
+ inFields(a.getArgList.get(0))
+ } else {
+ "*"
+ }
+ })")
+
+ val propStrings = namedProperties.map(_.property.toString)
+
+ (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
+ case (f, o) => if (f == o) {
+ f
+ } else {
+ s"$f AS $o"
+ }
+ }.mkString(", ")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
new file mode 100644
index 0000000..3f46258
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.table.runtime.FlatMapRunner
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+trait CommonCalc {
+
+ private[flink] def functionBody(
+ generator: CodeGenerator,
+ inputType: TypeInformation[Row],
+ rowType: RelDataType,
+ calcProgram: RexProgram,
+ config: TableConfig)
+ : String = {
+
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+
+ val condition = calcProgram.getCondition
+ val expandedExpressions = calcProgram.getProjectList.map(
+ expr => calcProgram.expandLocalRef(expr))
+ val projection = generator.generateResultExpression(
+ returnType,
+ rowType.getFieldNames,
+ expandedExpressions)
+
+ // only projection
+ if (condition == null) {
+ s"""
+ |${projection.code}
+ |${generator.collectorTerm}.collect(${projection.resultTerm});
+ |""".stripMargin
+ }
+ else {
+ val filterCondition = generator.generateExpression(
+ calcProgram.expandLocalRef(calcProgram.getCondition))
+ // only filter
+ if (projection == null) {
+ s"""
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${generator.collectorTerm}.collect(${generator.input1Term});
+ |}
+ |""".stripMargin
+ }
+ // both filter and projection
+ else {
+ s"""
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${projection.code}
+ | ${generator.collectorTerm}.collect(${projection.resultTerm});
+ |}
+ |""".stripMargin
+ }
+ }
+ }
+
+ private[flink] def calcMapFunction(
+ genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row])
+ : RichFlatMapFunction[Row, Row] = {
+
+ new FlatMapRunner[Row, Row](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+ }
+
+ private[flink] def conditionToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+ val cond = calcProgram.getCondition
+ val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+ val localExprs = calcProgram.getExprList.asScala.toList
+
+ if (cond != null) {
+ expression(cond, inFields, Some(localExprs))
+ } else {
+ ""
+ }
+ }
+
+ private[flink] def selectionToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+
+ val proj = calcProgram.getProjectList.asScala.toList
+ val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+ val localExprs = calcProgram.getExprList.asScala.toList
+ val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
+
+ proj
+ .map(expression(_, inFields, Some(localExprs)))
+ .zip(outFields).map { case (e, o) =>
+ if (e != o) {
+ e + " AS " + o
+ } else {
+ e
+ }
+ }.mkString(", ")
+ }
+
+ private[flink] def calcOpName(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+ val conditionStr = conditionToString(calcProgram, expression)
+ val selectionStr = selectionToString(calcProgram, expression)
+
+ s"${if (calcProgram.getCondition != null) {
+ s"where: ($conditionStr), "
+ } else {
+ ""
+ }}select: ($selectionStr)"
+ }
+
+ private[flink] def calcToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
+
+ val name = calcOpName(calcProgram, expression)
+ s"Calc($name)"
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
new file mode 100644
index 0000000..61b7ffb
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexCall, RexNode}
+import org.apache.calcite.sql.SemiJoinType
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableConfig, TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
+import org.apache.flink.table.functions.utils.TableSqlFunction
+import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+ * Join a user-defined table function
+ */
+trait CommonCorrelate {
+
+ /**
+ * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
+ * and user-defined table function.
+ */
+ private[flink] def correlateMapFunction(
+ config: TableConfig,
+ inputTypeInfo: TypeInformation[Row],
+ udtfTypeInfo: TypeInformation[Any],
+ rowType: RelDataType,
+ joinType: SemiJoinType,
+ rexCall: RexCall,
+ condition: Option[RexNode],
+ pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
+ ruleDescription: String)
+ : CorrelateFlatMapRunner[Row, Row] = {
+
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
+
+ val flatMap = generateFunction(
+ config,
+ inputTypeInfo,
+ udtfTypeInfo,
+ returnType,
+ rowType,
+ joinType,
+ rexCall,
+ pojoFieldMapping,
+ ruleDescription)
+
+ val collector = generateCollector(
+ config,
+ inputTypeInfo,
+ udtfTypeInfo,
+ returnType,
+ rowType,
+ condition,
+ pojoFieldMapping)
+
+ new CorrelateFlatMapRunner[Row, Row](
+ flatMap.name,
+ flatMap.code,
+ collector.name,
+ collector.code,
+ flatMap.returnType)
+
+ }
+
+ /**
+ * Generates the flat map function to run the user-defined table function.
+ */
+ private def generateFunction(
+ config: TableConfig,
+ inputTypeInfo: TypeInformation[Row],
+ udtfTypeInfo: TypeInformation[Any],
+ returnType: TypeInformation[Row],
+ rowType: RelDataType,
+ joinType: SemiJoinType,
+ rexCall: RexCall,
+ pojoFieldMapping: Option[Array[Int]],
+ ruleDescription: String)
+ : GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+
+ val functionGenerator = new CodeGenerator(
+ config,
+ false,
+ inputTypeInfo,
+ Some(udtfTypeInfo),
+ None,
+ pojoFieldMapping)
+
+ val (input1AccessExprs, input2AccessExprs) = functionGenerator.generateCorrelateAccessExprs
+
+ val collectorTerm = functionGenerator
+ .addReusableConstructor(classOf[TableFunctionCollector[_]])
+ .head
+
+ val call = functionGenerator.generateExpression(rexCall)
+ var body =
+ s"""
+ |${call.resultTerm}.setCollector($collectorTerm);
+ |${call.code}
+ |""".stripMargin
+
+ if (joinType == SemiJoinType.LEFT) {
+ // left outer join
+
+ // in case of left outer join and the returned row of table function is empty,
+ // fill all fields of row with null
+ val input2NullExprs = input2AccessExprs.map { x =>
+ GeneratedExpression(
+ primitiveDefaultValue(x.resultType),
+ ALWAYS_NULL,
+ NO_CODE,
+ x.resultType)
+ }
+ val outerResultExpr = functionGenerator.generateResultExpression(
+ input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
+ body +=
+ s"""
+ |boolean hasOutput = $collectorTerm.isCollected();
+ |if (!hasOutput) {
+ | ${outerResultExpr.code}
+ | ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
+ |}
+ |""".stripMargin
+ } else if (joinType != SemiJoinType.INNER) {
+ throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
+ }
+
+ functionGenerator.generateFunction(
+ ruleDescription,
+ classOf[FlatMapFunction[Row, Row]],
+ body,
+ returnType)
+ }
+
+ /**
+ * Generates table function collector.
+ */
+ private[flink] def generateCollector(
+ config: TableConfig,
+ inputTypeInfo: TypeInformation[Row],
+ udtfTypeInfo: TypeInformation[Any],
+ returnType: TypeInformation[Row],
+ rowType: RelDataType,
+ condition: Option[RexNode],
+ pojoFieldMapping: Option[Array[Int]])
+ : GeneratedCollector = {
+
+ val generator = new CodeGenerator(
+ config,
+ false,
+ inputTypeInfo,
+ Some(udtfTypeInfo),
+ None,
+ pojoFieldMapping)
+
+ val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
+
+ val crossResultExpr = generator.generateResultExpression(
+ input1AccessExprs ++ input2AccessExprs,
+ returnType,
+ rowType.getFieldNames.asScala)
+
+ val collectorCode = if (condition.isEmpty) {
+ s"""
+ |${crossResultExpr.code}
+ |getCollector().collect(${crossResultExpr.resultTerm});
+ |""".stripMargin
+ } else {
+ val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
+ filterGenerator.input1Term = filterGenerator.input2Term
+ val filterCondition = filterGenerator.generateExpression(condition.get)
+ s"""
+ |${filterGenerator.reuseInputUnboxingCode()}
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${crossResultExpr.code}
+ | getCollector().collect(${crossResultExpr.resultTerm});
+ |}
+ |""".stripMargin
+ }
+
+ generator.generateTableFunctionCollector(
+ "TableFunctionCollector",
+ collectorCode,
+ udtfTypeInfo)
+ }
+
+ private[flink] def selectToString(rowType: RelDataType): String = {
+ rowType.getFieldNames.asScala.mkString(",")
+ }
+
+ private[flink] def correlateOpName(
+ rexCall: RexCall,
+ sqlFunction: TableSqlFunction,
+ rowType: RelDataType)
+ : String = {
+
+ s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
+ }
+
+ private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
+ val udtfName = sqlFunction.getName
+ val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
+ s"table($udtfName($operands))"
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
new file mode 100644
index 0000000..274b602
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.types.Row
+
+/**
+ * Common class for batch and stream scans.
+ */
+trait CommonScan {
+
+ /**
+ * We check if the input type is exactly the same as the internal row type.
+ * A conversion is necessary if types differ.
+ */
+ private[flink] def needsConversion(
+ externalTypeInfo: TypeInformation[Any],
+ internalTypeInfo: TypeInformation[Row])
+ : Boolean = {
+
+ externalTypeInfo != internalTypeInfo
+ }
+
+ private[flink] def getConversionMapper(
+ config: TableConfig,
+ inputType: TypeInformation[Any],
+ expectedType: TypeInformation[Row],
+ conversionOperatorName: String,
+ fieldNames: Seq[String],
+ inputPojoFieldMapping: Option[Array[Int]] = None)
+ : MapFunction[Any, Row] = {
+
+ val generator = new CodeGenerator(
+ config,
+ false,
+ inputType,
+ None,
+ inputPojoFieldMapping)
+ val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
+
+ val body =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ val genFunction = generator.generateFunction(
+ conversionOperatorName,
+ classOf[MapFunction[Any, Row]],
+ body,
+ expectedType)
+
+ new MapRunner[Any, Row](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
deleted file mode 100644
index 7290594..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.flink.table.calcite.FlinkRelBuilder
-import FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.table.runtime.aggregate.AggregateUtil._
-
-import scala.collection.JavaConverters._
-
-trait FlinkAggregate {
-
- private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = {
-
- val inFields = inputType.getFieldNames.asScala
- grouping.map( inFields(_) ).mkString(", ")
- }
-
- private[flink] def aggregationToString(
- inputType: RelDataType,
- grouping: Array[Int],
- rowType: RelDataType,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- namedProperties: Seq[NamedWindowProperty])
- : String = {
-
- val inFields = inputType.getFieldNames.asScala
- val outFields = rowType.getFieldNames.asScala
-
- val groupStrings = grouping.map( inFields(_) )
-
- val aggs = namedAggregates.map(_.getKey)
- val aggStrings = aggs.map( a => s"${a.getAggregation}(${
- if (a.getArgList.size() > 0) {
- inFields(a.getArgList.get(0))
- } else {
- "*"
- }
- })")
-
- val propStrings = namedProperties.map(_.property.toString)
-
- (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
- case (f, o) => if (f == o) {
- f
- } else {
- s"$f AS $o"
- }
- }.mkString(", ")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
deleted file mode 100644
index 5ebd3ee..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexNode, RexProgram}
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.table.runtime.FlatMapRunner
-import org.apache.flink.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-trait FlinkCalc {
-
- private[flink] def functionBody(
- generator: CodeGenerator,
- inputType: TypeInformation[Any],
- rowType: RelDataType,
- calcProgram: RexProgram,
- config: TableConfig,
- expectedType: Option[TypeInformation[Any]]): String = {
-
- val returnType = determineReturnType(
- rowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val condition = calcProgram.getCondition
- val expandedExpressions = calcProgram.getProjectList.map(
- expr => calcProgram.expandLocalRef(expr))
- val projection = generator.generateResultExpression(
- returnType,
- rowType.getFieldNames,
- expandedExpressions)
-
- // only projection
- if (condition == null) {
- s"""
- |${projection.code}
- |${generator.collectorTerm}.collect(${projection.resultTerm});
- |""".stripMargin
- }
- else {
- val filterCondition = generator.generateExpression(
- calcProgram.expandLocalRef(calcProgram.getCondition))
- // only filter
- if (projection == null) {
- // conversion
- if (inputType != returnType) {
- val conversion = generator.generateConverterResultExpression(
- returnType,
- rowType.getFieldNames)
-
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${conversion.code}
- | ${generator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
- }
- // no conversion
- else {
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${generator.collectorTerm}.collect(${generator.input1Term});
- |}
- |""".stripMargin
- }
- }
- // both filter and projection
- else {
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${projection.code}
- | ${generator.collectorTerm}.collect(${projection.resultTerm});
- |}
- |""".stripMargin
- }
- }
- }
-
- private[flink] def calcMapFunction(
- genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): RichFlatMapFunction[Any, Any] = {
-
- new FlatMapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
- }
-
- private[flink] def conditionToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-
- val cond = calcProgram.getCondition
- val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
- val localExprs = calcProgram.getExprList.asScala.toList
-
- if (cond != null) {
- expression(cond, inFields, Some(localExprs))
- } else {
- ""
- }
- }
-
- private[flink] def selectionToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-
- val proj = calcProgram.getProjectList.asScala.toList
- val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
- val localExprs = calcProgram.getExprList.asScala.toList
- val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
-
- proj
- .map(expression(_, inFields, Some(localExprs)))
- .zip(outFields).map { case (e, o) => {
- if (e != o) {
- e + " AS " + o
- } else {
- e
- }
- }
- }.mkString(", ")
- }
-
- private[flink] def calcOpName(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
- val conditionStr = conditionToString(calcProgram, expression)
- val selectionStr = selectionToString(calcProgram, expression)
-
- s"${if (calcProgram.getCondition != null) {
- s"where: ($conditionStr), "
- } else {
- ""
- }}select: ($selectionStr)"
- }
-
- private[flink] def calcToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
- val name = calcOpName(calcProgram, expression)
- s"Calc($name)"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
deleted file mode 100644
index c986602..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
-import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
-import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
-import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
-import org.apache.flink.table.typeutils.TypeConverter._
-import org.apache.flink.table.api.{TableConfig, TableException}
-
-import scala.collection.JavaConverters._
-
-/**
- * Join a user-defined table function
- */
-trait FlinkCorrelate {
-
- /**
- * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
- * and user-defined table function.
- */
- private[flink] def correlateMapFunction(
- config: TableConfig,
- inputTypeInfo: TypeInformation[Any],
- udtfTypeInfo: TypeInformation[Any],
- rowType: RelDataType,
- joinType: SemiJoinType,
- rexCall: RexCall,
- condition: Option[RexNode],
- expectedType: Option[TypeInformation[Any]],
- pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
- ruleDescription: String)
- : CorrelateFlatMapRunner[Any, Any] = {
-
- val returnType = determineReturnType(
- rowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val flatMap = generateFunction(
- config,
- inputTypeInfo,
- udtfTypeInfo,
- returnType,
- rowType,
- joinType,
- rexCall,
- pojoFieldMapping,
- ruleDescription)
-
- val collector = generateCollector(
- config,
- inputTypeInfo,
- udtfTypeInfo,
- returnType,
- rowType,
- condition,
- pojoFieldMapping)
-
- new CorrelateFlatMapRunner[Any, Any](
- flatMap.name,
- flatMap.code,
- collector.name,
- collector.code,
- flatMap.returnType)
-
- }
-
- /**
- * Generates the flat map function to run the user-defined table function.
- */
- private def generateFunction(
- config: TableConfig,
- inputTypeInfo: TypeInformation[Any],
- udtfTypeInfo: TypeInformation[Any],
- returnType: TypeInformation[Any],
- rowType: RelDataType,
- joinType: SemiJoinType,
- rexCall: RexCall,
- pojoFieldMapping: Option[Array[Int]],
- ruleDescription: String)
- : GeneratedFunction[FlatMapFunction[Any, Any]] = {
-
- val functionGenerator = new CodeGenerator(
- config,
- false,
- inputTypeInfo,
- Some(udtfTypeInfo),
- None,
- pojoFieldMapping)
-
- val (input1AccessExprs, input2AccessExprs) = functionGenerator.generateCorrelateAccessExprs
-
- val collectorTerm = functionGenerator
- .addReusableConstructor(classOf[TableFunctionCollector[_]])
- .head
-
- val call = functionGenerator.generateExpression(rexCall)
- var body =
- s"""
- |${call.resultTerm}.setCollector($collectorTerm);
- |${call.code}
- |""".stripMargin
-
- if (joinType == SemiJoinType.LEFT) {
- // left outer join
-
- // in case of left outer join and the returned row of table function is empty,
- // fill all fields of row with null
- val input2NullExprs = input2AccessExprs.map { x =>
- GeneratedExpression(
- primitiveDefaultValue(x.resultType),
- ALWAYS_NULL,
- NO_CODE,
- x.resultType)
- }
- val outerResultExpr = functionGenerator.generateResultExpression(
- input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
- body +=
- s"""
- |boolean hasOutput = $collectorTerm.isCollected();
- |if (!hasOutput) {
- | ${outerResultExpr.code}
- | ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm});
- |}
- |""".stripMargin
- } else if (joinType != SemiJoinType.INNER) {
- throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
- }
-
- functionGenerator.generateFunction(
- ruleDescription,
- classOf[FlatMapFunction[Any, Any]],
- body,
- returnType)
- }
-
- /**
- * Generates table function collector.
- */
- private[flink] def generateCollector(
- config: TableConfig,
- inputTypeInfo: TypeInformation[Any],
- udtfTypeInfo: TypeInformation[Any],
- returnType: TypeInformation[Any],
- rowType: RelDataType,
- condition: Option[RexNode],
- pojoFieldMapping: Option[Array[Int]])
- : GeneratedCollector = {
-
- val generator = new CodeGenerator(
- config,
- false,
- inputTypeInfo,
- Some(udtfTypeInfo),
- None,
- pojoFieldMapping)
-
- val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
-
- val crossResultExpr = generator.generateResultExpression(
- input1AccessExprs ++ input2AccessExprs,
- returnType,
- rowType.getFieldNames.asScala)
-
- val collectorCode = if (condition.isEmpty) {
- s"""
- |${crossResultExpr.code}
- |getCollector().collect(${crossResultExpr.resultTerm});
- |""".stripMargin
- } else {
- val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
- filterGenerator.input1Term = filterGenerator.input2Term
- val filterCondition = filterGenerator.generateExpression(condition.get)
- s"""
- |${filterGenerator.reuseInputUnboxingCode()}
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${crossResultExpr.code}
- | getCollector().collect(${crossResultExpr.resultTerm});
- |}
- |""".stripMargin
- }
-
- generator.generateTableFunctionCollector(
- "TableFunctionCollector",
- collectorCode,
- udtfTypeInfo)
- }
-
- private[flink] def selectToString(rowType: RelDataType): String = {
- rowType.getFieldNames.asScala.mkString(",")
- }
-
- private[flink] def correlateOpName(
- rexCall: RexCall,
- sqlFunction: TableSqlFunction,
- rowType: RelDataType)
- : String = {
-
- s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
- }
-
- private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
- val udtfName = sqlFunction.getName
- val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
- s"table($udtfName($operands))"
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index a7765d1..7ad9bd5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -66,43 +66,6 @@ trait FlinkRel {
}
}
- private[flink] def getConversionMapper(
- config: TableConfig,
- nullableInput: Boolean,
- inputType: TypeInformation[Any],
- expectedType: TypeInformation[Any],
- conversionOperatorName: String,
- fieldNames: Seq[String],
- inputPojoFieldMapping: Option[Array[Int]] = None)
- : MapFunction[Any, Any] = {
-
- val generator = new CodeGenerator(
- config,
- nullableInput,
- inputType,
- None,
- inputPojoFieldMapping)
- val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
-
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
- val genFunction = generator.generateFunction(
- conversionOperatorName,
- classOf[MapFunction[Any, Any]],
- body,
- expectedType)
-
- new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- }
-
private[flink] def estimateRowSize(rowType: RelDataType): Double = {
val fieldList = rowType.getFieldList
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index 252bb2e..09262a6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.dataset
import org.apache.calcite.plan._
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rel.metadata.RelMetadataQuery
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.java.typeutils.PojoTypeInfo
import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.nodes.CommonScan
import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.table.typeutils.TypeConverter.determineReturnType
+import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
@@ -36,6 +36,7 @@ abstract class BatchScan(
traitSet: RelTraitSet,
table: RelOptTable)
extends TableScan(cluster, traitSet, table)
+ with CommonScan
with DataSetRel {
override def toString: String = {
@@ -48,50 +49,34 @@ abstract class BatchScan(
planner.getCostFactory.makeCost(rowCnt, rowCnt, 0)
}
- protected def convertToExpectedType(
+ protected def convertToInternalRow(
input: DataSet[Any],
flinkTable: FlinkTable[_],
- expectedType: Option[TypeInformation[Any]],
- config: TableConfig): DataSet[Any] = {
+ config: TableConfig)
+ : DataSet[Row] = {
val inputType = input.getType
- expectedType match {
+ val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
- // special case:
- // if efficient type usage is enabled and no expected type is set
- // we can simply forward the DataSet to the next operator.
- // however, we cannot forward PojoTypes as their fields don't have an order
- case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] =>
- input
+ // conversion
+ if (needsConversion(inputType, internalType)) {
- case _ =>
- val determinedType = determineReturnType(
- getRowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
+ val mapFunc = getConversionMapper(
+ config,
+ inputType,
+ internalType,
+ "DataSetSourceConversion",
+ getRowType.getFieldNames,
+ Some(flinkTable.fieldIndexes))
- // conversion
- if (determinedType != inputType) {
+ val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- val mapFunc = getConversionMapper(
- config,
- nullableInput = false,
- inputType,
- determinedType,
- "DataSetSourceConversion",
- getRowType.getFieldNames,
- Some(flinkTable.fieldIndexes))
-
- val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
-
- input.map(mapFunc).name(opName)
- }
- // no conversion necessary, forward
- else {
- input
- }
+ input.map(mapFunc).name(opName)
+ }
+ // no conversion necessary, forward
+ else {
+ input.asInstanceOf[DataSet[Row]]
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 73dddc6..9b8e1ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.schema.TableSourceTable
import org.apache.flink.table.sources.BatchTableSource
+import org.apache.flink.types.Row
/** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */
class BatchTableSourceScan(
@@ -62,13 +63,11 @@ class BatchTableSourceScan(
.item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
- convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config)
+ convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index 6771536..206e562 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -23,19 +23,15 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.plan.nodes.FlinkAggregate
+import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.runtime.aggregate.AggregateUtil
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
-import org.apache.flink.table.typeutils.TypeConverter
-import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.types.Row
-import scala.collection.JavaConverters._
-
/**
* Flink RelNode which matches along with a LogicalAggregate.
*/
@@ -49,7 +45,7 @@ class DataSetAggregate(
grouping: Array[Int],
inGroupingSet: Boolean)
extends SingleRel(cluster, traitSet, inputNode)
- with FlinkAggregate
+ with CommonAggregate
with DataSetRel {
override def deriveRowType(): RelDataType = rowRelDataType
@@ -89,9 +85,7 @@ class DataSetAggregate(
planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize)
}
- override def translateToPlan(
- tableEnv: BatchTableEnvironment,
- expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
+ override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
val config = tableEnv.getConfig
@@ -109,15 +103,7 @@ class DataSetAggregate(
grouping,
inGroupingSet)
- val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
- tableEnv,
- // tell the input operator that this operator currently only supports Rows as input
- Some(TypeConverter.DEFAULT_ROW_TYPE))
-
- // get the output types
- val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
- .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
- .toArray
+ val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
val prepareOpName = s"prepare select: ($aggString)"
@@ -125,46 +111,26 @@ class DataSetAggregate(
.map(mapFunction)
.name(prepareOpName)
- val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
-
- val result = {
- if (groupingKeys.length > 0) {
- // grouped aggregation
- val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
- s"select: ($aggString)"
-
- mappedInput.asInstanceOf[DataSet[Row]]
- .groupBy(groupingKeys: _*)
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
- .name(aggOpName)
- .asInstanceOf[DataSet[Any]]
- }
- else {
- // global aggregation
- val aggOpName = s"select:($aggString)"
- mappedInput.asInstanceOf[DataSet[Row]]
- .reduceGroup(groupReduceFunction)
- .returns(rowTypeInfo)
- .name(aggOpName)
- .asInstanceOf[DataSet[Any]]
- }
- }
+ val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+ if (groupingKeys.length > 0) {
+ // grouped aggregation
+ val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+ s"select: ($aggString)"
- // if the expected type is not a Row, inject a mapper to convert to the expected type
- expectedType match {
- case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
- val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- result.map(getConversionMapper(
- config = config,
- nullableInput = false,
- inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
- expectedType = expectedType.get,
- conversionOperatorName = "DataSetAggregateConversion",
- fieldNames = getRowType.getFieldNames.asScala
- ))
- .name(mapName)
- case _ => result
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .groupBy(groupingKeys: _*)
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ }
+ else {
+ // global aggregation
+ val aggOpName = s"select:($aggString)"
+ mappedInput.asInstanceOf[DataSet[Row]]
+ .reduceGroup(groupReduceFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
}
}
}