You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:54 UTC
[47/50] [abbrv] flink git commit: [FLINK-6091] [table] Implement and
turn on retraction for non-windowed aggregates.
[FLINK-6091] [table] Implement and turn on retraction for non-windowed aggregates.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/856485be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/856485be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/856485be
Branch: refs/heads/table-retraction
Commit: 856485be3be3bb240a068019fe21da3556f8b26f
Parents: 455a3c5
Author: Hequn Cheng <ch...@gmail.com>
Authored: Tue Apr 18 16:54:09 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed May 3 11:33:07 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 63 +++-
.../table/api/StreamTableEnvironment.scala | 62 +++-
.../flink/table/api/TableEnvironment.scala | 65 ++--
.../flink/table/plan/nodes/CommonCalc.scala | 26 +-
.../table/plan/nodes/CommonCorrelate.scala | 58 +---
.../flink/table/plan/nodes/CommonScan.scala | 24 +-
.../table/plan/nodes/dataset/BatchScan.scala | 10 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 23 +-
.../plan/nodes/dataset/DataSetCorrelate.scala | 32 +-
.../plan/nodes/dataset/DataSetValues.scala | 2 +-
.../plan/nodes/datastream/DataStreamCalc.scala | 30 +-
.../nodes/datastream/DataStreamCorrelate.scala | 38 ++-
.../datastream/DataStreamGroupAggregate.scala | 24 +-
.../DataStreamGroupWindowAggregate.scala | 47 ++-
.../datastream/DataStreamOverAggregate.scala | 62 ++--
.../plan/nodes/datastream/DataStreamRel.scala | 6 +-
.../plan/nodes/datastream/DataStreamScan.scala | 3 +-
.../plan/nodes/datastream/DataStreamUnion.scala | 3 +-
.../nodes/datastream/DataStreamValues.scala | 14 +-
.../plan/nodes/datastream/StreamScan.scala | 23 +-
.../datastream/StreamTableSourceScan.scala | 4 +-
.../plan/nodes/logical/FlinkLogicalCalc.scala | 2 +-
.../datastream/DataStreamRetractionRules.scala | 16 +-
.../runtime/CRowCorrelateFlatMapRunner.scala | 83 +++++
.../flink/table/runtime/CRowFlatMapRunner.scala | 72 ++++
.../table/runtime/CRowInputMapRunner.scala | 57 ++++
.../table/runtime/CRowOutputMapRunner.scala | 60 ++++
.../table/runtime/CRowWrappingCollector.scala | 41 +++
.../flink/table/runtime/FlatMapRunner.scala | 17 +-
.../aggregate/AggregateAggFunction.scala | 15 +-
.../table/runtime/aggregate/AggregateUtil.scala | 47 +--
...SetSessionWindowAggReduceGroupFunction.scala | 4 +-
...taSetSlideWindowAggReduceGroupFunction.scala | 4 +-
...TumbleTimeWindowAggReduceGroupFunction.scala | 4 +-
.../aggregate/GroupAggProcessFunction.scala | 58 +++-
...rementalAggregateAllTimeWindowFunction.scala | 7 +-
.../IncrementalAggregateAllWindowFunction.scala | 11 +-
...IncrementalAggregateTimeWindowFunction.scala | 7 +-
.../IncrementalAggregateWindowFunction.scala | 13 +-
.../aggregate/ProcTimeBoundedRangeOver.scala | 30 +-
.../aggregate/ProcTimeBoundedRowsOver.scala | 26 +-
.../ProcTimeUnboundedNonPartitionedOver.scala | 23 +-
.../ProcTimeUnboundedPartitionedOver.scala | 22 +-
.../aggregate/RowTimeBoundedRangeOver.scala | 32 +-
.../aggregate/RowTimeBoundedRowsOver.scala | 30 +-
.../aggregate/RowTimeUnboundedOver.scala | 50 +--
.../aggregate/TimeWindowPropertyCollector.scala | 34 +-
.../runtime/io/CRowValuesInputFormat.scala | 59 ++++
.../table/runtime/io/ValuesInputFormat.scala | 17 +-
.../apache/flink/table/runtime/types/CRow.scala | 55 +++
.../table/runtime/types/CRowComparator.scala | 83 +++++
.../table/runtime/types/CRowSerializer.scala | 78 +++++
.../table/runtime/types/CRowTypeInfo.scala | 98 ++++++
.../apache/flink/table/sinks/CsvTableSink.scala | 2 +
.../flink/table/TableEnvironmentTest.scala | 72 +++-
.../scala/batch/TableEnvironmentITCase.scala | 20 ++
.../api/scala/stream/RetractionITCase.scala | 331 +++++++++++++++++++
.../api/scala/stream/TableSinkITCase.scala | 2 +-
.../api/scala/stream/utils/StreamITCase.scala | 11 +-
...ProcessingOverRangeProcessFunctionTest.scala | 105 +++---
.../runtime/types/CRowComparatorTest.scala | 61 ++++
.../table/utils/MockTableEnvironment.scala | 8 +
62 files changed, 1896 insertions(+), 490 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 00cf11c..34f5018 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -26,16 +26,20 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.RuleSet
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataSetTable, TableSourceTable}
+import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
import org.apache.flink.table.sources.{BatchTableSource, TableSource}
import org.apache.flink.types.Row
@@ -128,6 +132,56 @@ abstract class BatchTableEnvironment(
}
/**
+ * Creates a final converter that maps the internal row type to external type.
+ *
+ * @param physicalTypeInfo the input of the sink
+ * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+ * @param requestedTypeInfo the output type of the sink
+ * @param functionName name of the map function. Must not be unique but has to be a
+ * valid Java class identifier.
+ */
+ override protected def getConversionMapper[IN, OUT](
+ physicalTypeInfo: TypeInformation[IN],
+ logicalRowType: RelDataType,
+ requestedTypeInfo: TypeInformation[OUT],
+ functionName: String):
+ Option[MapFunction[IN, OUT]] = {
+
+ if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+ // Row to Row, no conversion needed
+ None
+ } else if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
+ // Row to CRow, only needs to be wrapped
+ Some(
+ new RichMapFunction[Row, CRow] {
+ private var outCRow: CRow = _
+ override def open(parameters: Configuration): Unit = outCRow = new CRow(null, true)
+ override def map(value: Row): CRow = {
+ outCRow.row = value
+ outCRow
+ }
+ }.asInstanceOf[MapFunction[IN, OUT]]
+ )
+ } else {
+ // some type that is neither Row or CRow
+
+ val converterFunction = generateRowConverterFunction[OUT](
+ physicalTypeInfo.asInstanceOf[TypeInformation[Row]],
+ logicalRowType,
+ requestedTypeInfo,
+ functionName
+ )
+
+ val mapFunction = new MapRunner[IN, OUT](
+ converterFunction.name,
+ converterFunction.code,
+ converterFunction.returnType)
+
+ Some(mapFunction)
+ }
+ }
+
+ /**
* Returns the AST of the specified Table API and SQL queries and the execution plan to compute
* the result of the given [[Table]].
*
@@ -285,10 +339,15 @@ abstract class BatchTableEnvironment(
logicalPlan match {
case node: DataSetRel =>
val plan = node.translateToPlan(this)
- val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion")
+ val conversion =
+ getConversionMapper(plan.getType, logicalType, tpe, "DataSetSinkConversion")
conversion match {
case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary
- case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+ case Some(mapFunction: MapFunction[Row, A]) =>
+ plan.map(mapFunction)
+ .returns(tpe)
+ .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+ .asInstanceOf[DataSet[A]]
}
case _ =>
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f532c5b..e2bccf3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
+import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
@@ -36,6 +37,8 @@ import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
+import org.apache.flink.table.runtime.CRowInputMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.types.Row
@@ -169,6 +172,52 @@ abstract class StreamTableEnvironment(
}
}
+
+ /**
+ * Creates a final converter that maps the internal row type to external type.
+ *
+ * @param physicalTypeInfo the input of the sink
+ * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
+ * @param requestedTypeInfo the output type of the sink
+ * @param functionName name of the map function. Must not be unique but has to be a
+ * valid Java class identifier.
+ */
+ override protected def getConversionMapper[IN, OUT](
+ physicalTypeInfo: TypeInformation[IN],
+ logicalRowType: RelDataType,
+ requestedTypeInfo: TypeInformation[OUT],
+ functionName: String):
+ Option[MapFunction[IN, OUT]] = {
+
+ if (requestedTypeInfo.getTypeClass == classOf[CRow]) {
+ // CRow to CRow, no conversion needed
+ None
+ } else if (requestedTypeInfo.getTypeClass == classOf[Row]) {
+ // CRow to Row, only needs to be unwrapped
+ Some(
+ new MapFunction[CRow, Row] {
+ override def map(value: CRow): Row = value.row
+ }.asInstanceOf[MapFunction[IN, OUT]]
+ )
+ } else {
+ // Some type that is neither CRow nor Row
+
+ val converterFunction = generateRowConverterFunction[OUT](
+ physicalTypeInfo.asInstanceOf[CRowTypeInfo].rowType,
+ logicalRowType,
+ requestedTypeInfo,
+ functionName
+ )
+
+ Some(new CRowInputMapRunner[OUT](
+ converterFunction.name,
+ converterFunction.code,
+ converterFunction.returnType)
+ .asInstanceOf[MapFunction[IN, OUT]])
+
+ }
+ }
+
/**
* Registers a [[DataStream]] as a table under a given name in the [[TableEnvironment]]'s
* catalog.
@@ -334,10 +383,15 @@ abstract class StreamTableEnvironment(
logicalPlan match {
case node: DataStreamRel =>
val plan = node.translateToPlan(this)
- val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
+ val conversion =
+ getConversionMapper(plan.getType, logicalType, tpe, "DataStreamSinkConversion")
conversion match {
case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary
- case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe")
+ case Some(mapFunction: MapFunction[CRow, A]) =>
+ plan.map(mapFunction)
+ .returns(tpe)
+ .name(s"to: ${tpe.getTypeClass.getSimpleName}")
+ .asInstanceOf[DataStream[A]]
}
case _ =>
@@ -355,9 +409,9 @@ abstract class StreamTableEnvironment(
def explain(table: Table): String = {
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
- val dataStream = translate[Row](
+ val dataStream = translate[CRow](
optimizedPlan,
- ast.getRowType)(new GenericTypeInfo(classOf[Row]))
+ ast.getRowType)(new GenericTypeInfo(classOf[CRow]))
val env = dataStream.getExecutionEnvironment
val jsonSqlPlan = env.getExecutionPlan
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bd974b0..0a37f00 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -48,7 +48,7 @@ import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableE
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
-import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer}
+import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer, GeneratedFunction}
import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions}
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
@@ -56,7 +56,7 @@ import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.RelTable
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.runtime.types.CRowTypeInfo
import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
import org.apache.flink.table.validate.FunctionCatalog
@@ -615,6 +615,18 @@ abstract class TableEnvironment(val config: TableConfig) {
case _ => throw new TableException(
"Field reference expression or alias on field expression expected.")
}
+ case cr: CRowTypeInfo =>
+ exprs.zipWithIndex.map {
+ case (UnresolvedFieldReference(name), idx) => (idx, name)
+ case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
+ val idx = cr.getFieldIndex(origName)
+ if (idx < 0) {
+ throw new TableException(s"$origName is not a field of type $cr")
+ }
+ (idx, name)
+ case _ => throw new TableException(
+ "Field reference expression or alias on field expression expected.")
+ }
case c: CaseClassTypeInfo[A] =>
exprs.zipWithIndex.map {
case (UnresolvedFieldReference(name), idx) => (idx, name)
@@ -660,37 +672,38 @@ abstract class TableEnvironment(val config: TableConfig) {
/**
* Creates a final converter that maps the internal row type to external type.
*
- * @param physicalRowTypeInfo the input of the sink
+ * @param physicalTypeInfo the input of the sink
* @param logicalRowType the logical type with correct field names (esp. for POJO field mapping)
* @param requestedTypeInfo the output type of the sink
* @param functionName name of the map function. Must not be unique but has to be a
* valid Java class identifier.
*/
- protected def sinkConversion[T](
- physicalRowTypeInfo: TypeInformation[Row],
+ protected def getConversionMapper[IN, OUT](
+ physicalTypeInfo: TypeInformation[IN],
+ logicalRowType: RelDataType,
+ requestedTypeInfo: TypeInformation[OUT],
+ functionName: String):
+ Option[MapFunction[IN, OUT]]
+
+ protected def generateRowConverterFunction[OUT](
+ inputTypeInfo: TypeInformation[Row],
logicalRowType: RelDataType,
- requestedTypeInfo: TypeInformation[T],
- functionName: String)
- : Option[MapFunction[Row, T]] = {
+ requestedTypeInfo: TypeInformation[OUT],
+ functionName: String):
+ GeneratedFunction[MapFunction[Row, OUT], OUT] = {
// validate that at least the field types of physical and logical type match
// we do that here to make sure that plan translation was correct
val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType)
- if (physicalRowTypeInfo != logicalRowTypeInfo) {
+ if (logicalRowTypeInfo != inputTypeInfo) {
throw TableException("The field types of physical and logical row types do not match." +
"This is a bug and should not happen. Please file an issue.")
}
- // requested type is a generic Row, no conversion needed
- if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] &&
- requestedTypeInfo.getTypeClass == classOf[Row]) {
- return None
- }
-
// convert to type information
- val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType =>
- FlinkTypeFactory.toTypeInfo(relDataType.getType)
- }
+ val logicalFieldTypes = logicalRowType.getFieldList.asScala
+ .map(t => FlinkTypeFactory.toTypeInfo(t.getType))
+
// field names
val logicalFieldNames = logicalRowType.getFieldNames.asScala
@@ -698,8 +711,8 @@ abstract class TableEnvironment(val config: TableConfig) {
if (requestedTypeInfo.getArity != logicalFieldTypes.length) {
throw new TableException("Arity of result does not match requested type.")
}
- requestedTypeInfo match {
+ requestedTypeInfo match {
// POJO type requested
case pt: PojoTypeInfo[_] =>
logicalFieldNames.zip(logicalFieldTypes) foreach {
@@ -746,7 +759,7 @@ abstract class TableEnvironment(val config: TableConfig) {
val generator = new CodeGenerator(
config,
false,
- physicalRowTypeInfo,
+ inputTypeInfo,
None,
None)
@@ -760,20 +773,12 @@ abstract class TableEnvironment(val config: TableConfig) {
|return ${conversion.resultTerm};
|""".stripMargin
- val genFunction = generator.generateFunction(
+ generator.generateFunction(
functionName,
- classOf[MapFunction[Row, T]],
+ classOf[MapFunction[Row, OUT]],
body,
requestedTypeInfo)
-
- val mapFunction = new MapRunner[Row, T](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- Some(mapFunction)
}
-
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 96a7470..bec52ac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -26,21 +26,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.table.runtime.FlatMapRunner
import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-trait CommonCalc {
+trait CommonCalc[T] {
- private[flink] def functionBody(
+ private[flink] def generateFunction(
generator: CodeGenerator,
+ ruleDescription: String,
inputType: TypeInformation[Row],
rowType: RelDataType,
calcProgram: RexProgram,
- config: TableConfig)
- : String = {
+ config: TableConfig):
+ GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
@@ -53,7 +53,7 @@ trait CommonCalc {
expandedExpressions)
// only projection
- if (condition == null) {
+ val body = if (condition == null) {
s"""
|${projection.code}
|${generator.collectorTerm}.collect(${projection.resultTerm});
@@ -82,16 +82,12 @@ trait CommonCalc {
|""".stripMargin
}
}
- }
-
- private[flink] def calcMapFunction(
- genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row])
- : RichFlatMapFunction[Row, Row] = {
- new FlatMapRunner[Row, Row](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
+ generator.generateFunction(
+ ruleDescription,
+ classOf[FlatMapFunction[Row, Row]],
+ body,
+ returnType)
}
private[flink] def conditionToString(
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 6c4066b..83a68c0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -23,12 +23,11 @@ import org.apache.calcite.sql.SemiJoinType
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{TableConfig, TableException}
-import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue
import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction}
import org.apache.flink.table.functions.utils.TableSqlFunction
-import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector}
+import org.apache.flink.table.runtime.TableFunctionCollector
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -36,59 +35,12 @@ import scala.collection.JavaConverters._
/**
* Join a user-defined table function
*/
-trait CommonCorrelate {
-
- /**
- * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table
- * and user-defined table function.
- */
- private[flink] def correlateMapFunction(
- config: TableConfig,
- inputTypeInfo: TypeInformation[Row],
- udtfTypeInfo: TypeInformation[Any],
- rowType: RelDataType,
- joinType: SemiJoinType,
- rexCall: RexCall,
- condition: Option[RexNode],
- pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping
- ruleDescription: String)
- : CorrelateFlatMapRunner[Row, Row] = {
-
- val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType)
-
- val flatMap = generateFunction(
- config,
- inputTypeInfo,
- udtfTypeInfo,
- returnType,
- rowType,
- joinType,
- rexCall,
- pojoFieldMapping,
- ruleDescription)
-
- val collector = generateCollector(
- config,
- inputTypeInfo,
- udtfTypeInfo,
- returnType,
- rowType,
- condition,
- pojoFieldMapping)
-
- new CorrelateFlatMapRunner[Row, Row](
- flatMap.name,
- flatMap.code,
- collector.name,
- collector.code,
- flatMap.returnType)
-
- }
+trait CommonCorrelate[T] {
/**
* Generates the flat map function to run the user-defined table function.
*/
- private def generateFunction(
+ private[flink] def generateFunction(
config: TableConfig,
inputTypeInfo: TypeInformation[Row],
udtfTypeInfo: TypeInformation[Any],
@@ -97,8 +49,8 @@ trait CommonCorrelate {
joinType: SemiJoinType,
rexCall: RexCall,
pojoFieldMapping: Option[Array[Int]],
- ruleDescription: String)
- : GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
+ ruleDescription: String):
+ GeneratedFunction[FlatMapFunction[Row, Row], Row] = {
val functionGenerator = new CodeGenerator(
config,
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 0a0d204..5c44525 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -21,14 +21,13 @@ package org.apache.flink.table.plan.nodes
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
import org.apache.flink.types.Row
/**
* Common class for batch and stream scans.
*/
-trait CommonScan {
+trait CommonScan[T] {
/**
* We check if the input type is exactly the same as the internal row type.
@@ -36,20 +35,17 @@ trait CommonScan {
*/
private[flink] def needsConversion(
externalTypeInfo: TypeInformation[Any],
- internalTypeInfo: TypeInformation[Row])
- : Boolean = {
-
+ internalTypeInfo: TypeInformation[T]): Boolean =
externalTypeInfo != internalTypeInfo
- }
- private[flink] def getConversionMapper(
+ private[flink] def generateConversionFunction(
config: TableConfig,
inputType: TypeInformation[Any],
expectedType: TypeInformation[Row],
conversionOperatorName: String,
fieldNames: Seq[String],
- inputPojoFieldMapping: Option[Array[Int]] = None)
- : MapFunction[Any, Row] = {
+ inputPojoFieldMapping: Option[Array[Int]] = None):
+ GeneratedFunction[MapFunction[Any, Row], Row] = {
val generator = new CodeGenerator(
config,
@@ -65,17 +61,11 @@ trait CommonScan {
|return ${conversion.resultTerm};
|""".stripMargin
- val genFunction = generator.generateFunction(
+ generator.generateFunction(
conversionOperatorName,
classOf[MapFunction[Any, Row]],
body,
expectedType)
-
- new MapRunner[Any, Row](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index b39b8ed..64286f0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -23,12 +23,13 @@ import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.CommonScan
import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.types.Row
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-trait BatchScan extends CommonScan with DataSetRel {
+trait BatchScan extends CommonScan[Row] with DataSetRel {
protected def convertToInternalRow(
input: DataSet[Any],
@@ -43,7 +44,7 @@ trait BatchScan extends CommonScan with DataSetRel {
// conversion
if (needsConversion(inputType, internalType)) {
- val mapFunc = getConversionMapper(
+ val function = generateConversionFunction(
config,
inputType,
internalType,
@@ -51,6 +52,11 @@ trait BatchScan extends CommonScan with DataSetRel {
getRowType.getFieldNames,
Some(flinkTable.fieldIndexes))
+ val mapFunc = new MapRunner[Any, Row](
+ function.name,
+ function.code,
+ function.returnType)
+
val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
input.map(mapFunc).name(opName)
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index e05b5a8..359f5a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -24,12 +24,15 @@ import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex._
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.BatchTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.table.runtime.FlatMapRunner
import org.apache.flink.types.Row
/**
@@ -44,7 +47,7 @@ class DataSetCalc(
calcProgram: RexProgram,
ruleDescription: String)
extends Calc(cluster, traitSet, input, calcProgram)
- with CommonCalc
+ with CommonCalc[Row]
with DataSetRel {
override def deriveRowType(): RelDataType = rowRelDataType
@@ -83,24 +86,22 @@ class DataSetCalc(
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
- val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+ val returnType = FlinkTypeFactory
+ .toInternalRowTypeInfo(getRowType)
+ .asInstanceOf[RowTypeInfo]
val generator = new CodeGenerator(config, false, inputDS.getType)
- val body = functionBody(
+ val genFunction = generateFunction(
generator,
+ ruleDescription,
inputDS.getType,
getRowType,
calcProgram,
config)
- val genFunction = generator.generateFunction(
- ruleDescription,
- classOf[FlatMapFunction[Row, Row]],
- body,
- returnType)
+ val mapFunc = new FlatMapRunner(genFunction.name, genFunction.code, returnType)
- val mapFunc = calcMapFunction(genFunction)
inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
index 2a62e21..1ac6f6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -26,9 +26,11 @@ import org.apache.calcite.sql.SemiJoinType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.CommonCorrelate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.runtime.CorrelateFlatMapRunner
import org.apache.flink.types.Row
/**
@@ -45,7 +47,7 @@ class DataSetCorrelate(
joinType: SemiJoinType,
ruleDescription: String)
extends SingleRel(cluster, traitSet, inputNode)
- with CommonCorrelate
+ with CommonCorrelate[Row]
with DataSetRel {
override def deriveRowType() = relRowType
@@ -95,20 +97,38 @@ class DataSetCorrelate(
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
val rexCall = funcRel.getCall.asInstanceOf[RexCall]
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+ val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
- val mapFunc = correlateMapFunction(
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+
+ val flatMap = generateFunction(
config,
inputDS.getType,
udtfTypeInfo,
- getRowType,
+ returnType,
+ rowType,
joinType,
rexCall,
- condition,
- Some(pojoFieldMapping),
+ pojoFieldMapping,
ruleDescription)
+ val collector = generateCollector(
+ config,
+ inputDS.getType,
+ udtfTypeInfo,
+ returnType,
+ rowType,
+ condition,
+ pojoFieldMapping)
+
+ val mapFunc = new CorrelateFlatMapRunner[Row, Row](
+ flatMap.name,
+ flatMap.code,
+ collector.name,
+ collector.code,
+ flatMap.returnType)
+
inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
index 3ebee2c..948dd27 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetValues.scala
@@ -88,7 +88,7 @@ class DataSetValues(
generatedRecords.map(_.code),
returnType)
- val inputFormat = new ValuesInputFormat[Row](
+ val inputFormat = new ValuesInputFormat(
generatedFunction.name,
generatedFunction.code,
generatedFunction.returnType)
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
index b015a1d..0bf723d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
@@ -24,12 +24,15 @@ import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexProgram
-import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
import org.apache.flink.table.plan.nodes.CommonCalc
+import org.apache.flink.table.runtime.CRowFlatMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.types.Row
/**
@@ -44,7 +47,7 @@ class DataStreamCalc(
calcProgram: RexProgram,
ruleDescription: String)
extends Calc(cluster, traitSet, input, calcProgram)
- with CommonCalc
+ with CommonCalc[CRow]
with DataStreamRel {
override def deriveRowType(): RelDataType = rowRelDataType
@@ -75,28 +78,29 @@ class DataStreamCalc(
estimateRowCount(calcProgram, rowCnt)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
val config = tableEnv.getConfig
val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputRowType = inputDataStream.getType.asInstanceOf[CRowTypeInfo].rowType
+ val outputRowType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
- val generator = new CodeGenerator(config, false, inputDataStream.getType)
+ val generator = new CodeGenerator(config, false, inputRowType)
- val body = functionBody(
+ val genFunction = generateFunction(
generator,
- inputDataStream.getType,
+ ruleDescription,
+ inputRowType,
getRowType,
calcProgram,
config)
- val genFunction = generator.generateFunction(
- ruleDescription,
- classOf[FlatMapFunction[Row, Row]],
- body,
- FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
+ val mapFunc = new CRowFlatMapRunner(
+ genFunction.name,
+ genFunction.code,
+ CRowTypeInfo(outputRowType))
- val mapFunc = calcMapFunction(genFunction)
inputDataStream.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
index 7680904..dff5099 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -25,10 +25,12 @@ import org.apache.calcite.sql.SemiJoinType
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.CommonCorrelate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.CRowCorrelateFlatMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
/**
* Flink RelNode which matches along with join a user defined table function.
@@ -44,7 +46,7 @@ class DataStreamCorrelate(
joinType: SemiJoinType,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
- with CommonCorrelate
+ with CommonCorrelate[CRow]
with DataStreamRel {
override def deriveRowType() = relRowType
@@ -79,30 +81,50 @@ class DataStreamCorrelate(
.itemIf("condition", condition.orNull, condition.isDefined)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
val config = tableEnv.getConfig
// we do not need to specify input type
val inputDS = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val inputType = inputDS.getType.asInstanceOf[CRowTypeInfo]
val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
val rexCall = funcRel.getCall.asInstanceOf[RexCall]
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- val pojoFieldMapping = sqlFunction.getPojoFieldMapping
+ val pojoFieldMapping = Some(sqlFunction.getPojoFieldMapping)
val udtfTypeInfo = sqlFunction.getRowTypeInfo.asInstanceOf[TypeInformation[Any]]
- val mapFunc = correlateMapFunction(
+ val rowType = inputType.rowType
+ val returnType = FlinkTypeFactory.toInternalRowTypeInfo(relRowType)
+
+ val flatMap = generateFunction(
config,
- inputDS.getType,
+ rowType,
udtfTypeInfo,
+ returnType,
getRowType,
joinType,
rexCall,
- condition,
- Some(pojoFieldMapping),
+ pojoFieldMapping,
ruleDescription)
+ val collector = generateCollector(
+ config,
+ rowType,
+ udtfTypeInfo,
+ returnType,
+ getRowType,
+ condition,
+ pojoFieldMapping)
+
+ val mapFunc = new CRowCorrelateFlatMapRunner(
+ flatMap.name,
+ flatMap.code,
+ collector.name,
+ collector.code,
+ CRowTypeInfo(flatMap.returnType))
+
inputDS.flatMap(mapFunc).name(correlateOpName(rexCall, sqlFunction, relRowType))
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index d88c72b..14ae343 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -28,8 +28,9 @@ import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.plan.nodes.CommonAggregate
-import org.apache.flink.types.Row
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
/**
*
@@ -91,11 +92,10 @@ class DataStreamGroupAggregate(
.item("select", aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil))
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
-
- val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+ val outRowType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
val generator = new CodeGenerator(
tableEnv.getConfig,
@@ -117,28 +117,30 @@ class DataStreamGroupAggregate(
generator,
namedAggregates,
inputType,
- groupings)
+ groupings,
+ DataStreamRetractionRules.isAccRetract(this),
+ DataStreamRetractionRules.isAccRetract(getInput))
- val result: DataStream[Row] =
+ val result: DataStream[CRow] =
// grouped / keyed aggregation
if (groupings.nonEmpty) {
inputDS
.keyBy(groupings: _*)
.process(processFunction)
- .returns(rowTypeInfo)
+ .returns(outRowType)
.name(keyedAggOpName)
- .asInstanceOf[DataStream[Row]]
+ .asInstanceOf[DataStream[CRow]]
}
// global / non-keyed aggregation
else {
inputDS
- .keyBy(new NullByteKeySelector[Row])
+ .keyBy(new NullByteKeySelector[CRow])
.process(processFunction)
.setParallelism(1)
.setMaxParallelism(1)
- .returns(rowTypeInfo)
+ .returns(outRowType)
.name(nonKeyedAggOpName)
- .asInstanceOf[DataStream[Row]]
+ .asInstanceOf[DataStream[CRow]]
}
result
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 8959b23..d503792 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream,
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
@@ -35,11 +35,12 @@ import org.apache.flink.table.expressions._
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.apache.flink.table.runtime.aggregate.AggregateUtil._
import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-import org.apache.flink.types.Row
class DataStreamGroupWindowAggregate(
window: LogicalWindow,
@@ -103,11 +104,20 @@ class DataStreamGroupWindowAggregate(
namedProperties))
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
- val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+ val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+ if (consumeRetraction) {
+ throw new TableException(
+ "Retraction on windowed GroupBy aggregation is not supported yet. " +
+ "Note: Windowed GroupBy aggregation should not follow a " +
+ "non-windowed GroupBy aggregation.")
+ }
+
+ val outRowType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
val aggString = aggregationToString(
inputType,
@@ -138,17 +148,19 @@ class DataStreamGroupWindowAggregate(
val keyedStream = inputDS.keyBy(grouping: _*)
val windowedStream =
createKeyedWindowedStream(window, keyedStream)
- .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+ .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
val (aggFunction, accumulatorRowType, aggResultRowType) =
AggregateUtil.createDataStreamAggregateFunction(
generator,
namedAggregates,
inputType,
- rowRelDataType)
+ rowRelDataType,
+ grouping
+ )
windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType)
.name(keyedAggOpName)
}
// global / non-keyed aggregation
@@ -160,17 +172,19 @@ class DataStreamGroupWindowAggregate(
val windowedStream =
createNonKeyedWindowedStream(window, inputDS)
- .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+ .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
val (aggFunction, accumulatorRowType, aggResultRowType) =
AggregateUtil.createDataStreamAggregateFunction(
generator,
namedAggregates,
inputType,
- rowRelDataType)
+ rowRelDataType,
+ Array[Int]()
+ )
windowedStream
- .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo)
+ .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, outRowType)
.name(nonKeyedAggOpName)
}
}
@@ -178,9 +192,10 @@ class DataStreamGroupWindowAggregate(
object DataStreamGroupWindowAggregate {
-
- private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
- : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+ private def createKeyedWindowedStream(
+ groupWindow: LogicalWindow,
+ stream: KeyedStream[CRow, Tuple]):
+ WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match {
case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
@@ -221,8 +236,10 @@ object DataStreamGroupWindowAggregate {
stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
}
- private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
- : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+ private def createNonKeyedWindowedStream(
+ groupWindow: LogicalWindow,
+ stream: DataStream[CRow]):
+ AllWindowedStream[CRow, _ <: DataStreamWindow] = groupWindow match {
case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 031d533..a0d10a2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -25,18 +25,17 @@ import org.apache.calcite.rel.core.{AggregateCall, Window}
import org.apache.calcite.rel.core.Window.Group
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
-import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.OverAggregate
import org.apache.flink.table.runtime.aggregate._
-import org.apache.flink.types.Row
-
import org.apache.flink.api.java.functions.NullByteKeySelector
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
+import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
class DataStreamOverAggregate(
logicWindow: Window,
@@ -87,7 +86,7 @@ class DataStreamOverAggregate(
namedAggregates))
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
if (logicWindow.groups.size > 1) {
throw new TableException(
"Unsupported use of OVER windows. All aggregates must be computed on the same window.")
@@ -110,6 +109,8 @@ class DataStreamOverAggregate(
val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
+ val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
val generator = new CodeGenerator(
tableEnv.getConfig,
false,
@@ -120,6 +121,12 @@ class DataStreamOverAggregate(
.get(orderKey.getFieldIndex)
.getValue
+ if (consumeRetraction) {
+ throw new TableException(
+ "Retraction on Over window aggregation is not supported yet. " +
+ "Note: Over window aggregation should not follow a non-windowed GroupBy aggregation.")
+ }
+
timeType match {
case _: ProcTimeType =>
// proc-time OVER window
@@ -138,8 +145,7 @@ class DataStreamOverAggregate(
generator,
inputDS,
isRowTimeType = false,
- isRowsClause = overWindow.isRows
- )
+ isRowsClause = overWindow.isRows)
} else {
throw new TableException(
"OVER RANGE FOLLOWING windows are not supported yet.")
@@ -153,16 +159,14 @@ class DataStreamOverAggregate(
generator,
inputDS,
isRowTimeType = true,
- isRowsClause = overWindow.isRows
- )
+ isRowsClause = overWindow.isRows)
} else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) {
// bounded OVER window
createBoundedAndCurrentRowOverWindow(
generator,
inputDS,
isRowTimeType = true,
- isRowsClause = overWindow.isRows
- )
+ isRowsClause = overWindow.isRows)
} else {
throw new TableException(
"OVER RANGE FOLLOWING windows are not supported yet.")
@@ -177,16 +181,16 @@ class DataStreamOverAggregate(
def createUnboundedAndCurrentRowOverWindow(
generator: CodeGenerator,
- inputDS: DataStream[Row],
+ inputDS: DataStream[CRow],
isRowTimeType: Boolean,
- isRowsClause: Boolean): DataStream[Row] = {
+ isRowsClause: Boolean): DataStream[CRow] = {
val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
// get the output types
- val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+ val returnTypeInfo = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
generator,
@@ -196,30 +200,28 @@ class DataStreamOverAggregate(
partitionKeys.nonEmpty,
isRowsClause)
- val result: DataStream[Row] =
+ val result: DataStream[CRow] =
// partitioned aggregation
if (partitionKeys.nonEmpty) {
inputDS
.keyBy(partitionKeys: _*)
.process(processFunction)
- .returns(rowTypeInfo)
+ .returns(returnTypeInfo)
.name(aggOpName)
- .asInstanceOf[DataStream[Row]]
+ .asInstanceOf[DataStream[CRow]]
}
// non-partitioned aggregation
else {
if (isRowTimeType) {
- inputDS.keyBy(new NullByteKeySelector[Row])
+ inputDS.keyBy(new NullByteKeySelector[CRow])
.process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(rowTypeInfo)
+ .returns(returnTypeInfo)
.name(aggOpName)
- .asInstanceOf[DataStream[Row]]
} else {
inputDS
.process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(rowTypeInfo)
+ .returns(returnTypeInfo)
.name(aggOpName)
- .asInstanceOf[DataStream[Row]]
}
}
result
@@ -227,9 +229,9 @@ class DataStreamOverAggregate(
def createBoundedAndCurrentRowOverWindow(
generator: CodeGenerator,
- inputDS: DataStream[Row],
+ inputDS: DataStream[CRow],
isRowTimeType: Boolean,
- isRowsClause: Boolean): DataStream[Row] = {
+ isRowsClause: Boolean): DataStream[CRow] = {
val overWindow: Group = logicWindow.groups.get(0)
val partitionKeys: Array[Int] = overWindow.keys.toArray
@@ -239,9 +241,9 @@ class DataStreamOverAggregate(
getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0)
// get the output types
- val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+ val returnTypeInfo = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
- val processFunction = AggregateUtil.createBoundedOverProcessFunction(
+ val processFunction = AggregateUtil.createBoundedOverProcessFunction[CRow](
generator,
namedAggregates,
inputType,
@@ -249,24 +251,22 @@ class DataStreamOverAggregate(
isRowsClause,
isRowTimeType
)
- val result: DataStream[Row] =
+ val result: DataStream[CRow] =
// partitioned aggregation
if (partitionKeys.nonEmpty) {
inputDS
.keyBy(partitionKeys: _*)
.process(processFunction)
- .returns(rowTypeInfo)
+ .returns(returnTypeInfo)
.name(aggOpName)
- .asInstanceOf[DataStream[Row]]
}
// non-partitioned aggregation
else {
inputDS
- .keyBy(new NullByteKeySelector[Row])
+ .keyBy(new NullByteKeySelector[CRow])
.process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(rowTypeInfo)
+ .returns(returnTypeInfo)
.name(aggOpName)
- .asInstanceOf[DataStream[Row]]
}
result
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index 50d1d06..5c009a1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.datastream
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.plan.nodes.FlinkRelNode
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.types.CRow
trait DataStreamRel extends FlinkRelNode {
@@ -29,9 +29,9 @@ trait DataStreamRel extends FlinkRelNode {
* Translates the FlinkRelNode into a Flink operator.
*
* @param tableEnv The [[StreamTableEnvironment]] of the translated Table.
- * @return DataStream of type [[Row]]
+ * @return DataStream of type [[CRow]]
*/
- def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
+ def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[CRow]
/**
* Whether the [[DataStreamRel]] requires that update and delete changes are sent with retraction
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index c187ae8..1a43edb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.core.TableScan
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
/**
@@ -53,7 +54,7 @@ class DataStreamScan(
)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
convertToInternalRow(inputDataStream, dataStreamTable, config)
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index f340ac7..68e4e6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.types.Row
import scala.collection.JavaConverters._
@@ -60,7 +61,7 @@ class DataStreamUnion(
s"Union All(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
val leftDataSet = left.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val rightDataSet = right.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 0ab4a48..5e8d127 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -28,8 +28,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.runtime.io.ValuesInputFormat
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.io.{CRowValuesInputFormat}
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import scala.collection.JavaConverters._
@@ -57,11 +57,11 @@ class DataStreamValues(
)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
val config = tableEnv.getConfig
- val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+ val returnType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
val generator = new CodeGenerator(config)
@@ -77,12 +77,12 @@ class DataStreamValues(
val generatedFunction = generator.generateValuesInputFormat(
ruleDescription,
generatedRecords.map(_.code),
- returnType)
+ returnType.rowType)
- val inputFormat = new ValuesInputFormat[Row](
+ val inputFormat = new CRowValuesInputFormat(
generatedFunction.name,
generatedFunction.code,
- generatedFunction.returnType)
+ returnType)
tableEnv.execEnv.createInput(inputFormat, returnType)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 6d08302..02716cc 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -23,41 +23,46 @@ import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.CommonScan
import org.apache.flink.table.plan.schema.FlinkTable
-import org.apache.flink.types.Row
+import org.apache.flink.table.runtime.CRowOutputMapRunner
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
-trait StreamScan extends CommonScan with DataStreamRel {
+trait StreamScan extends CommonScan[CRow] with DataStreamRel {
protected def convertToInternalRow(
input: DataStream[Any],
flinkTable: FlinkTable[_],
config: TableConfig)
- : DataStream[Row] = {
+ : DataStream[CRow] = {
val inputType = input.getType
-
- val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+ val internalType = CRowTypeInfo(FlinkTypeFactory.toInternalRowTypeInfo(getRowType))
// conversion
if (needsConversion(inputType, internalType)) {
- val mapFunc = getConversionMapper(
+ val function = generateConversionFunction(
config,
inputType,
- internalType,
+ internalType.rowType,
"DataStreamSourceConversion",
getRowType.getFieldNames,
Some(flinkTable.fieldIndexes))
+ val mapFunc = new CRowOutputMapRunner(
+ function.name,
+ function.code,
+ internalType)
+
val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
- input.map(mapFunc).name(opName)
+ input.map(mapFunc).name(opName).returns(internalType)
}
// no conversion necessary, forward
else {
- input.asInstanceOf[DataStream[Row]]
+ input.asInstanceOf[DataStream[CRow]]
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 0a466a3..44ad706 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.TableSourceTable
+import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
-import org.apache.flink.types.Row
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
class StreamTableSourceScan(
@@ -64,7 +64,7 @@ class StreamTableSourceScan(
)
}
- override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
+ override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[CRow] = {
val config = tableEnv.getConfig
val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
index ec90392..0ca079e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalCalc.scala
@@ -34,7 +34,7 @@ class FlinkLogicalCalc(
calcProgram: RexProgram)
extends Calc(cluster, traitSet, input, calcProgram)
with FlinkLogicalRel
- with CommonCalc {
+ with CommonCalc[Any] {
override def copy(traitSet: RelTraitSet, child: RelNode, program: RexProgram): Calc = {
new FlinkLogicalCalc(cluster, traitSet, child, program)
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
index aeb67b6..bd9a7ee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamRetractionRules.scala
@@ -82,6 +82,14 @@ object DataStreamRetractionRules {
}
/**
+ * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
+ */
+ def isAccRetract(node: RelNode): Boolean = {
+ val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
+ null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
+ }
+
+ /**
* Rule that assigns the default retraction information to [[DataStreamRel]] nodes.
* The default is to not publish updates as retraction messages and [[AccMode.Acc]].
*/
@@ -190,14 +198,6 @@ object DataStreamRetractionRules {
}
/**
- * Checks if a [[RelNode]] is in [[AccMode.AccRetract]] mode.
- */
- def isAccRetract(node: RelNode): Boolean = {
- val accModeTrait = node.getTraitSet.getTrait(AccModeTraitDef.INSTANCE)
- null != accModeTrait && accModeTrait.getAccMode == AccMode.AccRetract
- }
-
- /**
* Set [[AccMode.AccRetract]] to a [[RelNode]].
*/
def setAccRetract(relNode: RelNode): RelNode = {
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
new file mode 100644
index 0000000..66e51b1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowCorrelateFlatMapRunner.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.{Logger, LoggerFactory}
+
+/**
+ * A CorrelateFlatMapRunner with [[CRow]] input and [[CRow]] output.
+ */
+class CRowCorrelateFlatMapRunner(
+ flatMapName: String,
+ flatMapCode: String,
+ collectorName: String,
+ collectorCode: String,
+ @transient returnType: TypeInformation[CRow])
+ extends RichFlatMapFunction[CRow, CRow]
+ with ResultTypeQueryable[CRow]
+ with Compiler[Any] {
+
+ val LOG: Logger = LoggerFactory.getLogger(this.getClass)
+
+ private var function: FlatMapFunction[Row, Row] = _
+ private var collector: TableFunctionCollector[_] = _
+ private var cRowWrapper: CRowWrappingCollector = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode)
+ LOG.debug("Instantiating TableFunctionCollector.")
+ collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]]
+ this.cRowWrapper = new CRowWrappingCollector()
+
+ LOG.debug(s"Compiling FlatMapFunction: $flatMapName \n\n Code:\n$flatMapCode")
+ val flatMapClazz = compile(getRuntimeContext.getUserCodeClassLoader, flatMapName, flatMapCode)
+ val constructor = flatMapClazz.getConstructor(classOf[TableFunctionCollector[_]])
+ LOG.debug("Instantiating FlatMapFunction.")
+ function = constructor.newInstance(collector).asInstanceOf[FlatMapFunction[Row, Row]]
+ FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+ FunctionUtils.openFunction(function, parameters)
+ }
+
+ override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
+ cRowWrapper.out = out
+ cRowWrapper.setChange(in.change)
+
+ collector.setCollector(cRowWrapper)
+ collector.setInput(in.row)
+ collector.reset()
+
+ function.flatMap(in.row, cRowWrapper)
+ }
+
+ override def getProducedType: TypeInformation[CRow] = returnType
+
+ override def close(): Unit = {
+ FunctionUtils.closeFunction(function)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
new file mode 100644
index 0000000..9a4650b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowFlatMapRunner.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.util.FunctionUtils
+import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+/**
+ * FlatMapRunner with [[CRow]] input and [[CRow]] output.
+ */
+class CRowFlatMapRunner(
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[CRow])
+ extends RichFlatMapFunction[CRow, CRow]
+ with ResultTypeQueryable[CRow]
+ with Compiler[FlatMapFunction[Row, Row]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: FlatMapFunction[Row, Row] = _
+ private var cRowWrapper: CRowWrappingCollector = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating FlatMapFunction.")
+ function = clazz.newInstance()
+ FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext)
+ FunctionUtils.openFunction(function, parameters)
+
+ this.cRowWrapper = new CRowWrappingCollector()
+ }
+
+ override def flatMap(in: CRow, out: Collector[CRow]): Unit = {
+ cRowWrapper.out = out
+ cRowWrapper.setChange(in.change)
+ function.flatMap(in.row, cRowWrapper)
+ }
+
+ override def getProducedType: TypeInformation[CRow] = returnType
+
+ override def close(): Unit = {
+ FunctionUtils.closeFunction(function)
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
new file mode 100644
index 0000000..8e95c93
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+ * MapRunner with [[CRow]] input.
+ */
+class CRowInputMapRunner[OUT](
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[OUT])
+ extends RichMapFunction[CRow, OUT]
+ with ResultTypeQueryable[OUT]
+ with Compiler[MapFunction[Row, OUT]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: MapFunction[Row, OUT] = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating MapFunction.")
+ function = clazz.newInstance()
+ }
+
+ override def map(in: CRow): OUT = {
+ function.map(in.row)
+ }
+
+ override def getProducedType: TypeInformation[OUT] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
new file mode 100644
index 0000000..966dea9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.codegen.Compiler
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
+
+/**
+ * MapRunner with [[CRow]] output.
+ */
+class CRowOutputMapRunner(
+ name: String,
+ code: String,
+ @transient returnType: TypeInformation[CRow])
+ extends RichMapFunction[Any, CRow]
+ with ResultTypeQueryable[CRow]
+ with Compiler[MapFunction[Any, Row]] {
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+
+ private var function: MapFunction[Any, Row] = _
+ private var outCRow: CRow = _
+
+ override def open(parameters: Configuration): Unit = {
+ LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
+ val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code)
+ LOG.debug("Instantiating MapFunction.")
+ function = clazz.newInstance()
+ outCRow = new CRow(null, true)
+ }
+
+ override def map(in: Any): CRow = {
+ outCRow.row = function.map(in)
+ outCRow
+ }
+
+ override def getProducedType: TypeInformation[CRow] = returnType
+}