You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:37 UTC
[03/15] flink git commit: [FLINK-5884] [table] Integrate time
indicators for Table API & SQL
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 2224752..8eb9d40 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -20,36 +20,34 @@ package org.apache.flink.table.plan.nodes.datastream
import java.util.{List => JList}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.{AggregateCall, Window}
import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Window}
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
-import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.functions.NullByteKeySelector
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.{StreamTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.plan.nodes.OverAggregate
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.types.Row
-import org.apache.flink.api.java.functions.NullByteKeySelector
-import org.apache.flink.table.codegen.CodeGenerator
-import org.apache.flink.table.functions.{ProcTimeType, RowTimeType}
-import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
-
class DataStreamOverAggregate(
logicWindow: Window,
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputNode: RelNode,
- rowRelDataType: RelDataType,
- inputType: RelDataType)
+ schema: RowSchema,
+ inputSchema: RowSchema)
extends SingleRel(cluster, traitSet, inputNode)
with OverAggregate
with DataStreamRel {
- override def deriveRowType(): RelDataType = rowRelDataType
+ override def deriveRowType(): RelDataType = schema.logicalType
override def copy(traitSet: RelTraitSet, inputs: JList[RelNode]): RelNode = {
new DataStreamOverAggregate(
@@ -57,8 +55,8 @@ class DataStreamOverAggregate(
cluster,
traitSet,
inputs.get(0),
- getRowType,
- inputType)
+ schema,
+ inputSchema)
}
override def toString: String = {
@@ -72,14 +70,16 @@ class DataStreamOverAggregate(
val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
super.explainTerms(pw)
- .itemIf("partitionBy", partitionToString(inputType, partitionKeys), partitionKeys.nonEmpty)
- .item("orderBy",orderingToString(inputType, overWindow.orderKeys.getFieldCollations))
- .itemIf("rows", windowRange(logicWindow, overWindow, getInput), overWindow.isRows)
- .itemIf("range", windowRange(logicWindow, overWindow, getInput), !overWindow.isRows)
+ .itemIf("partitionBy",
+ partitionToString(schema.logicalType, partitionKeys), partitionKeys.nonEmpty)
+ .item("orderBy",
+ orderingToString(schema.logicalType, overWindow.orderKeys.getFieldCollations))
+ .itemIf("rows", windowRange(logicWindow, overWindow, inputNode), overWindow.isRows)
+ .itemIf("range", windowRange(logicWindow, overWindow, inputNode), !overWindow.isRows)
.item(
"select", aggregationToString(
- inputType,
- getRowType,
+ inputSchema.logicalType,
+ schema.logicalType,
namedAggregates))
}
@@ -111,13 +111,13 @@ class DataStreamOverAggregate(
false,
inputDS.getType)
- val timeType = inputType
+ val timeType = schema.logicalType
.getFieldList
.get(orderKey.getFieldIndex)
- .getValue
+ .getType
timeType match {
- case _: ProcTimeType =>
+ case _ if FlinkTypeFactory.isProctimeIndicatorType(timeType) =>
// proc-time OVER window
if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
// unbounded OVER window
@@ -140,7 +140,8 @@ class DataStreamOverAggregate(
throw new TableException(
"OVER RANGE FOLLOWING windows are not supported yet.")
}
- case _: RowTimeType =>
+
+ case _ if FlinkTypeFactory.isRowtimeIndicatorType(timeType) =>
// row-time OVER window
if (overWindow.lowerBound.isPreceding &&
overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
@@ -158,17 +159,16 @@ class DataStreamOverAggregate(
inputDS,
isRowTimeType = true,
isRowsClause = overWindow.isRows
- )
+ )
} else {
throw new TableException(
"OVER RANGE FOLLOWING windows are not supported yet.")
}
+
case _ =>
throw new TableException(
- "Unsupported time type {$timeType}. " +
- "OVER windows do only support RowTimeType and ProcTimeType.")
+ s"OVER windows can only be applied on time attributes.")
}
-
}
def createUnboundedAndCurrentRowOverWindow(
@@ -178,16 +178,20 @@ class DataStreamOverAggregate(
isRowsClause: Boolean): DataStream[Row] = {
val overWindow: Group = logicWindow.groups.get(0)
- val partitionKeys: Array[Int] = overWindow.keys.toArray
- val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
-
- // get the output types
- val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+ val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex)
+ val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map {
+ namedAggregate =>
+ new CalcitePair[AggregateCall, String](
+ schema.mapAggregateCall(namedAggregate.left),
+ namedAggregate.right)
+ }
val processFunction = AggregateUtil.createUnboundedOverProcessFunction(
generator,
namedAggregates,
- inputType,
+ inputSchema.physicalType,
+ inputSchema.physicalTypeInfo,
+ inputSchema.physicalFieldTypeInfo,
isRowTimeType,
partitionKeys.nonEmpty,
isRowsClause)
@@ -198,7 +202,7 @@ class DataStreamOverAggregate(
inputDS
.keyBy(partitionKeys: _*)
.process(processFunction)
- .returns(rowTypeInfo)
+ .returns(schema.physicalTypeInfo)
.name(aggOpName)
.asInstanceOf[DataStream[Row]]
}
@@ -207,13 +211,13 @@ class DataStreamOverAggregate(
if (isRowTimeType) {
inputDS.keyBy(new NullByteKeySelector[Row])
.process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(rowTypeInfo)
+ .returns(schema.physicalTypeInfo)
.name(aggOpName)
.asInstanceOf[DataStream[Row]]
} else {
inputDS
.process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(rowTypeInfo)
+ .returns(schema.physicalTypeInfo)
.name(aggOpName)
.asInstanceOf[DataStream[Row]]
}
@@ -228,19 +232,26 @@ class DataStreamOverAggregate(
isRowsClause: Boolean): DataStream[Row] = {
val overWindow: Group = logicWindow.groups.get(0)
- val partitionKeys: Array[Int] = overWindow.keys.toArray
- val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
+ val partitionKeys: Array[Int] = overWindow.keys.toArray.map(schema.mapIndex)
+ val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates.map {
+ namedAggregate =>
+ new CalcitePair[AggregateCall, String](
+ schema.mapAggregateCall(namedAggregate.left),
+ namedAggregate.right)
+ }
val precedingOffset =
- getLowerBoundary(logicWindow, overWindow, getInput()) + (if (isRowsClause) 1 else 0)
-
- // get the output types
- val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+ getLowerBoundary(
+ logicWindow,
+ overWindow,
+ input) + (if (isRowsClause) 1 else 0)
val processFunction = AggregateUtil.createBoundedOverProcessFunction(
generator,
namedAggregates,
- inputType,
+ inputSchema.physicalType,
+ inputSchema.physicalTypeInfo,
+ inputSchema.physicalFieldTypeInfo,
precedingOffset,
isRowsClause,
isRowTimeType
@@ -251,7 +262,7 @@ class DataStreamOverAggregate(
inputDS
.keyBy(partitionKeys: _*)
.process(processFunction)
- .returns(rowTypeInfo)
+ .returns(schema.physicalTypeInfo)
.name(aggOpName)
.asInstanceOf[DataStream[Row]]
}
@@ -260,7 +271,7 @@ class DataStreamOverAggregate(
inputDS
.keyBy(new NullByteKeySelector[Row])
.process(processFunction).setParallelism(1).setMaxParallelism(1)
- .returns(rowTypeInfo)
+ .returns(schema.physicalTypeInfo)
.name(aggOpName)
.asInstanceOf[DataStream[Row]]
}
@@ -282,17 +293,18 @@ class DataStreamOverAggregate(
s"over: (${
if (!partitionKeys.isEmpty) {
- s"PARTITION BY: ${partitionToString(inputType, partitionKeys)}, "
+ s"PARTITION BY: ${partitionToString(inputSchema.logicalType, partitionKeys)}, "
} else {
""
}
- }ORDER BY: ${orderingToString(inputType, overWindow.orderKeys.getFieldCollations)}, " +
+ }ORDER BY: ${orderingToString(inputSchema.logicalType,
+ overWindow.orderKeys.getFieldCollations)}, " +
s"${if (overWindow.isRows) "ROWS" else "RANGE"}" +
- s"${windowRange(logicWindow, overWindow, getInput)}, " +
+ s"${windowRange(logicWindow, overWindow, inputNode.asInstanceOf[DataStreamRel])}, " +
s"select: (${
aggregationToString(
- inputType,
- getRowType,
+ inputSchema.logicalType,
+ schema.logicalType,
namedAggregates)
}))"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
index ae172a5..03938f3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
@@ -34,4 +34,3 @@ trait DataStreamRel extends FlinkRelNode {
def translateToPlan(tableEnv: StreamTableEnvironment) : DataStream[Row]
}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index c187ae8..05f60ba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.TableScan
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema}
import org.apache.flink.types.Row
/**
@@ -36,27 +36,27 @@ class DataStreamScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
- rowRelDataType: RelDataType)
+ schema: RowSchema)
extends TableScan(cluster, traitSet, table)
with StreamScan {
val dataStreamTable: DataStreamTable[Any] = getTable.unwrap(classOf[DataStreamTable[Any]])
- override def deriveRowType(): RelDataType = rowRelDataType
+ override def deriveRowType(): RelDataType = schema.logicalType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamScan(
cluster,
traitSet,
getTable,
- getRowType
+ schema
)
}
override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
- convertToInternalRow(inputDataStream, dataStreamTable, config)
+ convertToInternalRow(schema, inputDataStream, dataStreamTable, config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
index f340ac7..47b4946 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamUnion.scala
@@ -19,14 +19,12 @@
package org.apache.flink.table.plan.nodes.datastream
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.types.Row
-import scala.collection.JavaConverters._
-
/**
* Flink RelNode which matches along with Union.
*
@@ -36,11 +34,11 @@ class DataStreamUnion(
traitSet: RelTraitSet,
leftNode: RelNode,
rightNode: RelNode,
- rowRelDataType: RelDataType)
+ schema: RowSchema)
extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataStreamRel {
- override def deriveRowType() = rowRelDataType
+ override def deriveRowType() = schema.logicalType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamUnion(
@@ -48,7 +46,7 @@ class DataStreamUnion(
traitSet,
inputs.get(0),
inputs.get(1),
- getRowType
+ schema
)
}
@@ -57,7 +55,7 @@ class DataStreamUnion(
}
override def toString = {
- s"Union All(union: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
+ s"Union All(union: (${schema.logicalFieldNames.mkString(", ")}))"
}
override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
@@ -68,6 +66,6 @@ class DataStreamUnion(
}
private def unionSelectionToString: String = {
- getRowType.getFieldNames.asScala.toList.mkString(", ")
+ schema.logicalFieldNames.mkString(", ")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
index 0ab4a48..c964e03 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamValues.scala
@@ -21,13 +21,12 @@ package org.apache.flink.table.plan.nodes.datastream
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rex.RexLiteral
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.StreamTableEnvironment
-import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.io.ValuesInputFormat
import org.apache.flink.types.Row
@@ -39,19 +38,19 @@ import scala.collection.JavaConverters._
class DataStreamValues(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- rowRelDataType: RelDataType,
+ schema: RowSchema,
tuples: ImmutableList[ImmutableList[RexLiteral]],
ruleDescription: String)
- extends Values(cluster, rowRelDataType, tuples, traitSet)
+ extends Values(cluster, schema.logicalType, tuples, traitSet)
with DataStreamRel {
- override def deriveRowType() = rowRelDataType
+ override def deriveRowType() = schema.logicalType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataStreamValues(
cluster,
traitSet,
- getRowType,
+ schema,
getTuples,
ruleDescription
)
@@ -61,15 +60,13 @@ class DataStreamValues(
val config = tableEnv.getConfig
- val returnType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
-
val generator = new CodeGenerator(config)
// generate code for every record
val generatedRecords = getTuples.asScala.map { r =>
generator.generateResultExpression(
- returnType,
- getRowType.getFieldNames.asScala,
+ schema.physicalTypeInfo,
+ schema.physicalFieldNames,
r.asScala)
}
@@ -77,14 +74,14 @@ class DataStreamValues(
val generatedFunction = generator.generateValuesInputFormat(
ruleDescription,
generatedRecords.map(_.code),
- returnType)
+ schema.physicalTypeInfo)
val inputFormat = new ValuesInputFormat[Row](
generatedFunction.name,
generatedFunction.code,
generatedFunction.returnType)
- tableEnv.execEnv.createInput(inputFormat, returnType)
+ tableEnv.execEnv.createInput(inputFormat, schema.physicalTypeInfo)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 6d08302..dd82819 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -18,42 +18,46 @@
package org.apache.flink.table.plan.nodes.datastream
+import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.CommonScan
-import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.plan.schema.{FlinkTable, RowSchema}
+import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.types.Row
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
trait StreamScan extends CommonScan with DataStreamRel {
protected def convertToInternalRow(
+ schema: RowSchema,
input: DataStream[Any],
flinkTable: FlinkTable[_],
config: TableConfig)
: DataStream[Row] = {
- val inputType = input.getType
-
- val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
-
// conversion
- if (needsConversion(inputType, internalType)) {
+ if (needsConversion(input.getType, schema.physicalTypeInfo)) {
- val mapFunc = getConversionMapper(
+ val function = generatedConversionFunction(
config,
- inputType,
- internalType,
+ classOf[MapFunction[Any, Row]],
+ input.getType,
+ schema.physicalTypeInfo,
"DataStreamSourceConversion",
- getRowType.getFieldNames,
+ schema.physicalFieldNames,
Some(flinkTable.fieldIndexes))
- val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+ val runner = new MapRunner[Any, Row](
+ function.name,
+ function.code,
+ function.returnType)
+
+ val opName = s"from: (${schema.logicalFieldNames.mkString(", ")})"
- input.map(mapFunc).name(opName)
+ // TODO we need a ProcessFunction here
+ input.map(runner).name(opName)
}
// no conversion necessary, forward
else {
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 0a466a3..5dc3da8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -22,10 +22,11 @@ import org.apache.calcite.plan._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.StreamTableEnvironment
+import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
-import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.plan.schema.{RowSchema, TableSourceTable}
+import org.apache.flink.table.sources.{DefinedTimeAttributes, StreamTableSource, TableSource}
import org.apache.flink.types.Row
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -37,7 +38,50 @@ class StreamTableSourceScan(
extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
with StreamScan {
- override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
+ override def deriveRowType() = {
+ val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+ def removeIndex[T](idx: Int, l: List[T]): List[T] = {
+ if (l.size < idx) {
+ l
+ } else {
+ l.take(idx) ++ l.drop(idx + 1)
+ }
+ }
+
+ var fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+ var fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+
+ val rowtime = tableSource match {
+ case timeSource: DefinedTimeAttributes if timeSource.getRowtimeAttribute != null =>
+ val rowtimeAttribute = timeSource.getRowtimeAttribute
+ // remove physical field if it is overwritten by time attribute
+ fieldNames = removeIndex(rowtimeAttribute.f0, fieldNames)
+ fieldTypes = removeIndex(rowtimeAttribute.f0, fieldTypes)
+ Some((rowtimeAttribute.f0, rowtimeAttribute.f1))
+ case _ =>
+ None
+ }
+
+ val proctime = tableSource match {
+ case timeSource: DefinedTimeAttributes if timeSource.getProctimeAttribute != null =>
+ val proctimeAttribute = timeSource.getProctimeAttribute
+ // remove physical field if it is overwritten by time attribute
+ fieldNames = removeIndex(proctimeAttribute.f0, fieldNames)
+ fieldTypes = removeIndex(proctimeAttribute.f0, fieldTypes)
+ Some((proctimeAttribute.f0, proctimeAttribute.f1))
+ case _ =>
+ None
+ }
+
+ flinkTypeFactory.buildLogicalRowType(
+ fieldNames,
+ fieldTypes,
+ rowtime,
+ proctime)
+ }
+
+ override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
val rowCnt = metadata.getRowCount(this)
planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType))
}
@@ -67,6 +111,10 @@ class StreamTableSourceScan(
override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = {
val config = tableEnv.getConfig
val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
- convertToInternalRow(inputDataStream, new TableSourceTable(tableSource), config)
+ convertToInternalRow(
+ new RowSchema(getRowType),
+ inputDataStream,
+ new TableSourceTable(tableSource),
+ config)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
index b1f991e..11b227f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalOverWindow.scala
@@ -45,7 +45,7 @@ class FlinkLogicalOverWindow(
traitSet,
inputs.get(0),
windowConstants,
- rowType,
+ getRowType,
windowGroups)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index eacbafa..53e7b31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -47,9 +47,11 @@ class FlinkLogicalTableSourceScan(
override def deriveRowType(): RelDataType = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- flinkTypeFactory.buildRowDataType(
+ flinkTypeFactory.buildLogicalRowType(
TableEnvironment.getFieldNames(tableSource),
- TableEnvironment.getFieldTypes(tableSource.getReturnType))
+ TableEnvironment.getFieldTypes(tableSource.getReturnType),
+ None,
+ None)
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 4da2da9..7577deb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -53,8 +53,8 @@ class WindowStartEndPropertiesRule
transformed.push(LogicalWindowAggregate.create(
agg.getWindow,
Seq(
- NamedWindowProperty("w$start", WindowStart(agg.getWindow.alias)),
- NamedWindowProperty("w$end", WindowEnd(agg.getWindow.alias))
+ NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+ NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
), agg)
)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
index f011b66..fc65403 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamAggregate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalWindowAggregate
+import org.apache.flink.table.plan.schema.RowSchema
import scala.collection.JavaConversions._
@@ -65,8 +66,8 @@ class DataStreamAggregateRule
traitSet,
convInput,
agg.getNamedAggCalls,
- rel.getRowType,
- agg.getInput.getRowType,
+ new RowSchema(rel.getRowType),
+ new RowSchema(agg.getInput.getRowType),
agg.getGroupSet.toArray)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
index 1777264..0a1a31a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamCalc
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalCalc
+import org.apache.flink.table.plan.schema.RowSchema
class DataStreamCalcRule
extends ConverterRule(
@@ -42,7 +43,8 @@ class DataStreamCalcRule
rel.getCluster,
traitSet,
convInput,
- rel.getRowType,
+ new RowSchema(convInput.getRowType),
+ new RowSchema(rel.getRowType),
calc.getProgram,
description)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
index ae39d40..cd0663e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCorrelateRule.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rex.RexNode
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamCorrelate
import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
+import org.apache.flink.table.plan.schema.RowSchema
class DataStreamCorrelateRule
extends ConverterRule(
@@ -68,11 +69,12 @@ class DataStreamCorrelateRule
new DataStreamCorrelate(
rel.getCluster,
traitSet,
+ new RowSchema(convInput.getRowType),
convInput,
scan,
condition,
- rel.getRowType,
- join.getRowType,
+ new RowSchema(rel.getRowType),
+ new RowSchema(join.getRowType),
join.getJoinType,
description)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 175a202..28efcf5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -18,15 +18,15 @@
package org.apache.flink.table.plan.rules.datastream
-import java.math.BigDecimal
+import java.math.{BigDecimal => JBigDecimal}
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexBuilder, RexCall, RexLiteral, RexNode}
+import org.apache.calcite.rex._
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.flink.table.api.{TableException, Window}
import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.functions.TimeModeTypes
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.expressions.{Literal, UnresolvedFieldReference}
import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
@@ -49,16 +49,12 @@ class DataStreamLogicalWindowAggregateRule
val timeType = windowExpression.operands.get(0).getType
timeType match {
- case TimeModeTypes.ROWTIME =>
- rexBuilder.makeAbstractCast(
- TimeModeTypes.ROWTIME,
- rexBuilder.makeLiteral(0L, TimeModeTypes.ROWTIME, true))
- case TimeModeTypes.PROCTIME =>
- rexBuilder.makeAbstractCast(
- TimeModeTypes.PROCTIME,
- rexBuilder.makeLiteral(0L, TimeModeTypes.PROCTIME, true))
+
+ case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) =>
+ rexBuilder.makeLiteral(0L, timeType, true)
+
case _ =>
- throw TableException(s"""Unexpected time type $timeType encountered""")
+ throw TableException(s"""Time attribute expected but $timeType encountered.""")
}
}
@@ -68,41 +64,41 @@ class DataStreamLogicalWindowAggregateRule
def getOperandAsLong(call: RexCall, idx: Int): Long =
call.getOperands.get(idx) match {
- case v : RexLiteral => v.getValue.asInstanceOf[BigDecimal].longValue()
- case _ => throw new TableException("Only constant window descriptors are supported")
+ case v: RexLiteral => v.getValue.asInstanceOf[JBigDecimal].longValue()
+ case _ => throw new TableException("Only constant window descriptors are supported.")
+ }
+
+ def getOperandAsTimeIndicator(call: RexCall, idx: Int): String =
+ call.getOperands.get(idx) match {
+ case v: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(v.getType) =>
+ rowType.getFieldList.get(v.getIndex).getName
+ case _ =>
+ throw new TableException("Window can only be defined over a time attribute column.")
}
windowExpr.getOperator match {
case SqlStdOperatorTable.TUMBLE =>
+ val time = getOperandAsTimeIndicator(windowExpr, 0)
val interval = getOperandAsLong(windowExpr, 1)
val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- val window = windowExpr.getType match {
- case TimeModeTypes.PROCTIME => w
- case TimeModeTypes.ROWTIME => w.on("rowtime")
- }
- window.as("w$")
+ w.on(UnresolvedFieldReference(time)).as("w$")
case SqlStdOperatorTable.HOP =>
+ val time = getOperandAsTimeIndicator(windowExpr, 0)
val (slide, size) = (getOperandAsLong(windowExpr, 1), getOperandAsLong(windowExpr, 2))
val w = Slide
.over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
.every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- val window = windowExpr.getType match {
- case TimeModeTypes.PROCTIME => w
- case TimeModeTypes.ROWTIME => w.on("rowtime")
- }
- window.as("w$")
+ w.on(UnresolvedFieldReference(time)).as("w$")
+
case SqlStdOperatorTable.SESSION =>
+ val time = getOperandAsTimeIndicator(windowExpr, 0)
val gap = getOperandAsLong(windowExpr, 1)
val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- val window = windowExpr.getType match {
- case TimeModeTypes.PROCTIME => w
- case TimeModeTypes.ROWTIME => w.on("rowtime")
- }
- window.as("w$")
+ w.on(UnresolvedFieldReference(time)).as("w$")
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
index 8e96970..b3d7603 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamOverAggregateRule.scala
@@ -25,6 +25,7 @@ import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalOverWindow
+import org.apache.flink.table.plan.schema.RowSchema
class DataStreamOverAggregateRule
extends ConverterRule(
@@ -46,8 +47,8 @@ class DataStreamOverAggregateRule
rel.getCluster,
traitSet,
convertInput,
- rel.getRowType,
- inputRowType)
+ new RowSchema(rel.getRowType),
+ new RowSchema(inputRowType))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
index 5bf60a7..d8dda80 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamScanRule.scala
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamScan
-import org.apache.flink.table.plan.schema.DataStreamTable
+import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema}
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalNativeTableScan
class DataStreamScanRule
@@ -53,7 +53,7 @@ class DataStreamScanRule
rel.getCluster,
traitSet,
scan.getTable,
- rel.getRowType
+ new RowSchema(rel.getRowType)
)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
index 4241f53..8402f6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamUnionRule.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamUnion
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalUnion
+import org.apache.flink.table.plan.schema.RowSchema
class DataStreamUnionRule
extends ConverterRule(
@@ -44,7 +45,7 @@ class DataStreamUnionRule
traitSet,
convLeft,
convRight,
- rel.getRowType)
+ new RowSchema(rel.getRowType))
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
index fbad21f..a1453a8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamValuesRule.scala
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.convert.ConverterRule
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamValues
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalValues
+import org.apache.flink.table.plan.schema.RowSchema
class DataStreamValuesRule
extends ConverterRule(
@@ -40,7 +41,7 @@ class DataStreamValuesRule
new DataStreamValues(
rel.getCluster,
traitSet,
- rel.getRowType,
+ new RowSchema(rel.getRowType),
values.getTuples,
description)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
index 6ce6570..70054b4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DataStreamTable.scala
@@ -18,13 +18,27 @@
package org.apache.flink.table.plan.schema
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.stats.FlinkStatistic
class DataStreamTable[T](
val dataStream: DataStream[T],
override val fieldIndexes: Array[Int],
override val fieldNames: Array[String],
+ val rowtime: Option[(Int, String)],
+ val proctime: Option[(Int, String)],
override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames, statistic) {
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+ val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+
+ flinkTypeFactory.buildLogicalRowType(
+ fieldNames,
+ fieldTypes,
+ rowtime,
+ proctime)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
index ea77061..752b00e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
@@ -48,10 +48,11 @@ abstract class FlinkTable[T](
val fieldTypes: Array[TypeInformation[_]] =
typeInfo match {
case cType: CompositeType[_] =>
- if (fieldNames.length != cType.getArity) {
+ // it is ok to leave out fields
+ if (fieldNames.length > cType.getArity) {
throw new TableException(
s"Arity of type (" + cType.getFieldNames.deep + ") " +
- "not equal to number of field names " + fieldNames.deep + ".")
+ "must not be greater than number of field names " + fieldNames.deep + ".")
}
fieldIndexes.map(cType.getTypeAt(_).asInstanceOf[TypeInformation[_]])
case aType: AtomicType[_] =>
@@ -64,7 +65,7 @@ abstract class FlinkTable[T](
override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
- flinkTypeFactory.buildRowDataType(fieldNames, fieldTypes)
+ flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes, None, None)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
new file mode 100644
index 0000000..b42be82
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelRecordType}
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConversions._
+
+/**
+ * Schema that describes both a logical and physical row.
+ */
+class RowSchema(private val logicalRowType: RelDataType) {
+
+ private lazy val physicalRowFields: Seq[RelDataTypeField] = logicalRowType.getFieldList filter {
+ field => !FlinkTypeFactory.isTimeIndicatorType(field.getType)
+ }
+
+ private lazy val physicalRowType: RelDataType = new RelRecordType(physicalRowFields)
+
+ private lazy val physicalRowFieldTypes: Seq[TypeInformation[_]] = physicalRowFields map { f =>
+ FlinkTypeFactory.toTypeInfo(f.getType)
+ }
+
+ private lazy val physicalRowFieldNames: Seq[String] = physicalRowFields.map(_.getName)
+
+ private lazy val physicalRowTypeInfo: TypeInformation[Row] = new RowTypeInfo(
+ physicalRowFieldTypes.toArray, physicalRowFieldNames.toArray)
+
+ private lazy val indexMapping: Array[Int] = generateIndexMapping
+
+ private lazy val inputRefUpdater = new RexInputRefUpdater()
+
+ private def generateIndexMapping: Array[Int] = {
+ val mapping = new Array[Int](logicalRowType.getFieldCount)
+ var countTimeIndicators = 0
+ var i = 0
+ while (i < logicalRowType.getFieldCount) {
+ val t = logicalRowType.getFieldList.get(i).getType
+ if (FlinkTypeFactory.isTimeIndicatorType(t)) {
+ countTimeIndicators += 1
+ // no mapping
+ mapping(i) = -1
+ } else {
+ mapping(i) = i - countTimeIndicators
+ }
+ i += 1
+ }
+ mapping
+ }
+
+ private class RexInputRefUpdater extends RexShuttle {
+
+ override def visitInputRef(inputRef: RexInputRef): RexNode = {
+ new RexInputRef(mapIndex(inputRef.getIndex), inputRef.getType)
+ }
+ }
+
+ /**
+ * Returns the arity of the logical record.
+ */
+ def logicalArity: Int = logicalRowType.getFieldCount
+
+ /**
+ * Returns the arity of the physical record.
+ */
+ def physicalArity: Int = physicalTypeInfo.getArity
+
+ /**
+ * Returns a logical [[RelDataType]] including logical fields (i.e. time indicators).
+ */
+ def logicalType: RelDataType = logicalRowType
+
+ /**
+ * Returns a physical [[RelDataType]] with no logical fields (i.e. time indicators).
+ */
+ def physicalType: RelDataType = physicalRowType
+
+ /**
+ * Returns a physical [[TypeInformation]] of row with no logical fields (i.e. time indicators).
+ */
+ def physicalTypeInfo: TypeInformation[Row] = physicalRowTypeInfo
+
+ /**
+ * Returns [[TypeInformation]] of the row's fields with no logical fields (i.e. time indicators).
+ */
+ def physicalFieldTypeInfo: Seq[TypeInformation[_]] = physicalRowFieldTypes
+
+ /**
+ * Returns the logical fields names including logical fields (i.e. time indicators).
+ */
+ def logicalFieldNames: Seq[String] = logicalRowType.getFieldNames
+
+ /**
+ * Returns the physical fields names with no logical fields (i.e. time indicators).
+ */
+ def physicalFieldNames: Seq[String] = physicalRowFieldNames
+
+ /**
+ * Converts logical indices to physical indices based on this schema.
+ */
+ def mapIndex(logicalIndex: Int): Int = {
+ val mappedIndex = indexMapping(logicalIndex)
+ if (mappedIndex < 0) {
+ throw new TableException("Invalid access to a logical field.")
+ } else {
+ mappedIndex
+ }
+ }
+
+ /**
+ * Converts logical indices of a aggregate call to physical ones.
+ */
+ def mapAggregateCall(logicalAggCall: AggregateCall): AggregateCall = {
+ logicalAggCall.copy(
+ logicalAggCall.getArgList.map(mapIndex(_).asInstanceOf[Integer]),
+ if (logicalAggCall.filterArg < 0) {
+ logicalAggCall.filterArg
+ } else {
+ mapIndex(logicalAggCall.filterArg)
+ }
+ )
+ }
+
+ /**
+ * Converts logical field references of a [[RexNode]] to physical ones.
+ */
+ def mapRexNode(logicalRexNode: RexNode): RexNode = logicalRexNode.accept(inputRefUpdater)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
new file mode 100644
index 0000000..5e27061
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.RelDataTypeSystem
+import org.apache.calcite.sql.`type`.BasicSqlType
+
+/**
+ * Creates a time indicator type for event-time or processing-time, but with similar properties
+ * as a basic SQL type.
+ */
+class TimeIndicatorRelDataType(
+ typeSystem: RelDataTypeSystem,
+ originalType: BasicSqlType,
+ val isEventTime: Boolean)
+ extends BasicSqlType(
+ typeSystem,
+ originalType.getSqlTypeName,
+ originalType.getPrecision) {
+
+ override def equals(other: Any): Boolean = other match {
+ case that: TimeIndicatorRelDataType =>
+ super.equals(that) &&
+ isEventTime == that.isEventTime
+ case that: BasicSqlType =>
+ super.equals(that)
+ case _ => false
+ }
+
+ override def hashCode(): Int = {
+ super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
index 51e2fc5..32562c7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/MapRunner.scala
@@ -35,7 +35,7 @@ class MapRunner[IN, OUT](
val LOG = LoggerFactory.getLogger(this.getClass)
- private var function: MapFunction[IN, OUT] = null
+ private var function: MapFunction[IN, OUT] = _
override def open(parameters: Configuration): Unit = {
LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code")
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index e38207d..07992cd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -26,17 +26,18 @@ import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun._
import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction, AggregateFunction => DataStreamAggFunction, _}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.functions.{AggregateFunction => DataStreamAggFunction, _}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
+import org.apache.flink.table.expressions.ExpressionUtils.isTimeIntervalLiteral
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.aggfunctions._
import org.apache.flink.table.functions.utils.AggSqlFunction
@@ -61,26 +62,31 @@ object AggregateUtil {
* window to evaluate final aggregate value.
*
* @param generator code generator instance
- * @param namedAggregates List of calls to aggregate functions and their output field names
- * @param inputType Input row type
+ * @param namedAggregates Physical calls to aggregate functions and their output field names
+ * @param inputType Physical type of the row.
+ * @param inputTypeInfo Physical type information of the row.
+ * @param inputFieldTypeInfo Physical type information of the row's fields.
* @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
* @param isPartitioned It is a tag that indicate whether the input is partitioned
* @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
*/
private[flink] def createUnboundedOverProcessFunction(
- generator: CodeGenerator,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- isRowTimeType: Boolean,
- isPartitioned: Boolean,
- isRowsClause: Boolean): ProcessFunction[Row, Row] = {
+ generator: CodeGenerator,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ inputTypeInfo: TypeInformation[Row],
+ inputFieldTypeInfo: Seq[TypeInformation[_]],
+ isRowTimeType: Boolean,
+ isPartitioned: Boolean,
+ isRowsClause: Boolean)
+ : ProcessFunction[Row, Row] = {
val needRetract = false
val (aggFields, aggregates) =
transformToAggregateFunctions(
namedAggregates.map(_.getKey),
inputType,
- needRetract)
+ needRetraction = false)
val aggregationStateType: RowTypeInfo =
createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
@@ -92,7 +98,7 @@ object AggregateUtil {
val genFunction = generator.generateAggregations(
"UnboundedProcessingOverAggregateHelper",
generator,
- inputType,
+ inputFieldTypeInfo,
aggregates,
aggFields,
aggMapping,
@@ -112,13 +118,13 @@ object AggregateUtil {
new RowTimeUnboundedRowsOver(
genFunction,
aggregationStateType,
- FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+ inputTypeInfo)
} else {
// RANGE unbounded over process function
new RowTimeUnboundedRangeOver(
genFunction,
aggregationStateType,
- FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+ inputTypeInfo)
}
} else {
if (isPartitioned) {
@@ -138,20 +144,25 @@ object AggregateUtil {
* bounded OVER window to evaluate final aggregate value.
*
* @param generator code generator instance
- * @param namedAggregates List of calls to aggregate functions and their output field names
- * @param inputType Input row type
+ * @param namedAggregates Physical calls to aggregate functions and their output field names
+ * @param inputType Physical type of the row.
+ * @param inputTypeInfo Physical type information of the row.
+ * @param inputFieldTypeInfo Physical type information of the row's fields.
* @param precedingOffset the preceding offset
* @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause
* @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
* @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
*/
private[flink] def createBoundedOverProcessFunction(
- generator: CodeGenerator,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- precedingOffset: Long,
- isRowsClause: Boolean,
- isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
+ generator: CodeGenerator,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ inputTypeInfo: TypeInformation[Row],
+ inputFieldTypeInfo: Seq[TypeInformation[_]],
+ precedingOffset: Long,
+ isRowsClause: Boolean,
+ isRowTimeType: Boolean)
+ : ProcessFunction[Row, Row] = {
val needRetract = true
val (aggFields, aggregates) =
@@ -161,7 +172,6 @@ object AggregateUtil {
needRetract)
val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates)
- val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
val forwardMapping = (0 until inputType.getFieldCount).toArray
val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray
@@ -170,7 +180,7 @@ object AggregateUtil {
val genFunction = generator.generateAggregations(
"BoundedOverAggregateHelper",
generator,
- inputType,
+ inputFieldTypeInfo,
aggregates,
aggFields,
aggMapping,
@@ -189,14 +199,14 @@ object AggregateUtil {
new RowTimeBoundedRowsOver(
genFunction,
aggregationStateType,
- inputRowType,
+ inputTypeInfo,
precedingOffset
)
} else {
new RowTimeBoundedRangeOver(
genFunction,
aggregationStateType,
- inputRowType,
+ inputTypeInfo,
precedingOffset
)
}
@@ -206,13 +216,13 @@ object AggregateUtil {
genFunction,
precedingOffset,
aggregationStateType,
- inputRowType)
+ inputTypeInfo)
} else {
new ProcTimeBoundedRangeOver(
genFunction,
precedingOffset,
aggregationStateType,
- inputRowType)
+ inputTypeInfo)
}
}
}
@@ -241,12 +251,13 @@ object AggregateUtil {
* NOTE: this function is only used for time based window on batch tables.
*/
def createDataSetWindowPrepareMapFunction(
- generator: CodeGenerator,
- window: LogicalWindow,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- groupings: Array[Int],
- inputType: RelDataType,
- isParserCaseSensitive: Boolean)
+ generator: CodeGenerator,
+ window: LogicalWindow,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ groupings: Array[Int],
+ inputType: RelDataType,
+ inputFieldTypeInfo: Seq[TypeInformation[_]],
+ isParserCaseSensitive: Boolean)
: MapFunction[Row, Row] = {
val needRetract = false
@@ -263,28 +274,28 @@ object AggregateUtil {
Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
val (timeFieldPos, tumbleTimeWindowSize) = window match {
- case EventTimeTumblingGroupWindow(_, time, size) if isTimeInterval(size.resultType) =>
- val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
- (timeFieldPos, Some(asLong(size)))
- case EventTimeTumblingGroupWindow(_, time, _) =>
+ case TumblingGroupWindow(_, time, size) =>
val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
- (timeFieldPos, None)
+ size match {
+ case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ (timeFieldPos, Some(value))
+ case _ => (timeFieldPos, None)
+ }
- case EventTimeSessionGroupWindow(_, time, _) =>
- val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
- (timeFieldPos, None)
+ case SessionGroupWindow(_, time, _) =>
+ (getTimeFieldPosition(time, inputType, isParserCaseSensitive), None)
- case EventTimeSlidingGroupWindow(_, time, size, slide)
- if isTimeInterval(time.resultType) && doAllSupportPartialMerge(aggregates) =>
- // pre-tumble incremental aggregates on time-windows
+ case SlidingGroupWindow(_, time, size, slide) =>
val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
- val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide))
- (timeFieldPos, Some(preTumblingSize))
-
- case EventTimeSlidingGroupWindow(_, time, _, _) =>
- val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
- (timeFieldPos, None)
+ size match {
+ case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ // pre-tumble incremental aggregates on time-windows
+ val timeFieldPos = getTimeFieldPosition(time, inputType, isParserCaseSensitive)
+ val preTumblingSize = determineLargestTumblingSize(asLong(size), asLong(slide))
+ (timeFieldPos, Some(preTumblingSize))
+ case _ => (timeFieldPos, None)
+ }
case _ =>
throw new UnsupportedOperationException(s"$window is currently not supported on batch")
@@ -296,7 +307,7 @@ object AggregateUtil {
val genFunction = generator.generateAggregations(
"DataSetAggregatePrepareMapHelper",
generator,
- inputType,
+ inputFieldTypeInfo,
aggregates,
aggFieldIndexes,
aggMapping,
@@ -349,31 +360,32 @@ object AggregateUtil {
window: LogicalWindow,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
groupings: Array[Int],
- inputType: RelDataType,
+ physicalInputRowType: RelDataType,
+ physicalInputTypes: Seq[TypeInformation[_]],
isParserCaseSensitive: Boolean)
: RichGroupReduceFunction[Row, Row] = {
val needRetract = false
val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
- inputType,
+ physicalInputRowType,
needRetract)
val returnType: RowTypeInfo = createDataSetAggregateBufferDataType(
groupings,
aggregates,
- inputType,
+ physicalInputRowType,
Some(Array(BasicTypeInfo.LONG_TYPE_INFO)))
val keysAndAggregatesArity = groupings.length + namedAggregates.length
window match {
- case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ case SlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
// sliding time-window for partial aggregations
val genFunction = generator.generateAggregations(
"DataSetAggregatePrepareMapHelper",
generator,
- inputType,
+ physicalInputTypes,
aggregates,
aggFieldIndexes,
aggregates.indices.map(_ + groupings.length).toArray,
@@ -433,7 +445,7 @@ object AggregateUtil {
: FlatMapFunction[Row, Row] = {
window match {
- case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ case SlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
new DataSetSlideTimeWindowAggFlatMapFunction(
inputType.getArity - 1,
asLong(size),
@@ -458,7 +470,8 @@ object AggregateUtil {
generator: CodeGenerator,
window: LogicalWindow,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
+ physicalInputRowType: RelDataType,
+ physicalInputTypes: Seq[TypeInformation[_]],
outputType: RelDataType,
groupings: Array[Int],
properties: Seq[NamedWindowProperty],
@@ -468,7 +481,7 @@ object AggregateUtil {
val needRetract = false
val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
- inputType,
+ physicalInputRowType,
needRetract)
val aggMapping = aggregates.indices.toArray.map(_ + groupings.length)
@@ -476,7 +489,7 @@ object AggregateUtil {
val genPreAggFunction = generator.generateAggregations(
"GroupingWindowAggregateHelper",
generator,
- inputType,
+ physicalInputTypes,
aggregates,
aggFieldIndexes,
aggMapping,
@@ -493,7 +506,7 @@ object AggregateUtil {
val genFinalAggFunction = generator.generateAggregations(
"GroupingWindowAggregateHelper",
generator,
- inputType,
+ physicalInputTypes,
aggregates,
aggFieldIndexes,
aggMapping,
@@ -510,7 +523,7 @@ object AggregateUtil {
val keysAndAggregatesArity = groupings.length + namedAggregates.length
window match {
- case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ case TumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
// tumbling time window
val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
if (doAllSupportPartialMerge(aggregates)) {
@@ -532,13 +545,13 @@ object AggregateUtil {
endPos,
outputType.getFieldCount)
}
- case EventTimeTumblingGroupWindow(_, _, size) =>
+ case TumblingGroupWindow(_, _, size) =>
// tumbling count window
new DataSetTumbleCountWindowAggReduceGroupFunction(
genFinalAggFunction,
asLong(size))
- case EventTimeSessionGroupWindow(_, _, gap) =>
+ case SessionGroupWindow(_, _, gap) =>
val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
new DataSetSessionWindowAggReduceGroupFunction(
genFinalAggFunction,
@@ -548,7 +561,7 @@ object AggregateUtil {
asLong(gap),
isInputCombined)
- case EventTimeSlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
+ case SlidingGroupWindow(_, _, size, _) if isTimeInterval(size.resultType) =>
val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
if (doAllSupportPartialMerge(aggregates)) {
// for partial aggregations
@@ -570,7 +583,7 @@ object AggregateUtil {
asLong(size))
}
- case EventTimeSlidingGroupWindow(_, _, size, _) =>
+ case SlidingGroupWindow(_, _, size, _) =>
new DataSetSlideWindowAggReduceGroupFunction(
genFinalAggFunction,
keysAndAggregatesArity,
@@ -608,13 +621,14 @@ object AggregateUtil {
generator: CodeGenerator,
window: LogicalWindow,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
+ physicalInputRowType: RelDataType,
+ physicalInputTypes: Seq[TypeInformation[_]],
groupings: Array[Int]): MapPartitionFunction[Row, Row] = {
val needRetract = false
val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
- inputType,
+ physicalInputRowType,
needRetract)
val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
@@ -622,18 +636,18 @@ object AggregateUtil {
val keysAndAggregatesArity = groupings.length + namedAggregates.length
window match {
- case EventTimeSessionGroupWindow(_, _, gap) =>
+ case SessionGroupWindow(_, _, gap) =>
val combineReturnType: RowTypeInfo =
createDataSetAggregateBufferDataType(
groupings,
aggregates,
- inputType,
+ physicalInputRowType,
Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
val genFunction = generator.generateAggregations(
"GroupingWindowAggregateHelper",
generator,
- inputType,
+ physicalInputTypes,
aggregates,
aggFieldIndexes,
aggMapping,
@@ -679,14 +693,15 @@ object AggregateUtil {
generator: CodeGenerator,
window: LogicalWindow,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
+ physicalInputRowType: RelDataType,
+ physicalInputTypes: Seq[TypeInformation[_]],
groupings: Array[Int])
: GroupCombineFunction[Row, Row] = {
val needRetract = false
val (aggFieldIndexes, aggregates) = transformToAggregateFunctions(
namedAggregates.map(_.getKey),
- inputType,
+ physicalInputRowType,
needRetract)
val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
@@ -695,18 +710,18 @@ object AggregateUtil {
window match {
- case EventTimeSessionGroupWindow(_, _, gap) =>
+ case SessionGroupWindow(_, _, gap) =>
val combineReturnType: RowTypeInfo =
createDataSetAggregateBufferDataType(
groupings,
aggregates,
- inputType,
+ physicalInputRowType,
Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
val genFunction = generator.generateAggregations(
"GroupingWindowAggregateHelper",
generator,
- inputType,
+ physicalInputTypes,
aggregates,
aggFieldIndexes,
aggMapping,
@@ -742,6 +757,7 @@ object AggregateUtil {
generator: CodeGenerator,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType,
+ inputFieldTypeInfo: Seq[TypeInformation[_]],
outputType: RelDataType,
groupings: Array[Int],
inGroupingSet: Boolean): (Option[DataSetPreAggFunction],
@@ -786,7 +802,7 @@ object AggregateUtil {
val genPreAggFunction = generator.generateAggregations(
"DataSetAggregatePrepareMapHelper",
generator,
- inputType,
+ inputFieldTypeInfo,
aggregates,
aggInFields,
aggregates.indices.map(_ + groupings.length).toArray,
@@ -813,7 +829,7 @@ object AggregateUtil {
val genFinalAggFunction = generator.generateAggregations(
"DataSetAggregateFinalHelper",
generator,
- inputType,
+ inputFieldTypeInfo,
aggregates,
aggInFields,
aggOutFields,
@@ -837,7 +853,7 @@ object AggregateUtil {
val genFunction = generator.generateAggregations(
"DataSetAggregateHelper",
generator,
- inputType,
+ inputFieldTypeInfo,
aggregates,
aggInFields,
aggOutFields,
@@ -914,6 +930,7 @@ object AggregateUtil {
generator: CodeGenerator,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
inputType: RelDataType,
+ inputFieldTypeInfo: Seq[TypeInformation[_]],
outputType: RelDataType,
needMerge: Boolean)
: (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = {
@@ -931,7 +948,7 @@ object AggregateUtil {
val genFunction = generator.generateAggregations(
"GroupingWindowAggregateHelper",
generator,
- inputType,
+ inputFieldTypeInfo,
aggregates,
aggFields,
aggMapping,
@@ -1047,12 +1064,9 @@ object AggregateUtil {
private def isTimeWindow(window: LogicalWindow) = {
window match {
- case ProcessingTimeTumblingGroupWindow(_, size) => isTimeInterval(size.resultType)
- case ProcessingTimeSlidingGroupWindow(_, size, _) => isTimeInterval(size.resultType)
- case ProcessingTimeSessionGroupWindow(_, _) => true
- case EventTimeTumblingGroupWindow(_, _, size) => isTimeInterval(size.resultType)
- case EventTimeSlidingGroupWindow(_, _, size, _) => isTimeInterval(size.resultType)
- case EventTimeSessionGroupWindow(_, _, _) => true
+ case TumblingGroupWindow(_, _, size) => isTimeIntervalLiteral(size)
+ case SlidingGroupWindow(_, _, size, _) => isTimeIntervalLiteral(size)
+ case SessionGroupWindow(_, _, _) => true
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 03ca02c..ef97e71 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -21,7 +21,7 @@ import java.util.{List => JList, ArrayList => JArrayList}
import org.apache.flink.api.common.state._
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler}
@@ -39,8 +39,8 @@ import org.slf4j.LoggerFactory
*/
class RowTimeBoundedRangeOver(
genAggregations: GeneratedAggregationsFunction,
- aggregationStateType: RowTypeInfo,
- inputRowType: RowTypeInfo,
+ aggregationStateType: TypeInformation[Row],
+ inputRowType: TypeInformation[Row],
precedingOffset: Long)
extends ProcessFunction[Row, Row]
with Compiler[GeneratedAggregations] {
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
index 4a9a14c..7169cf7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory
class RowTimeBoundedRowsOver(
genAggregations: GeneratedAggregationsFunction,
aggregationStateType: RowTypeInfo,
- inputRowType: RowTypeInfo,
+ inputRowType: TypeInformation[Row],
precedingOffset: Long)
extends ProcessFunction[Row, Row]
with Compiler[GeneratedAggregations] {
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
new file mode 100644
index 0000000..8466cdf
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import org.apache.flink.api.java.tuple.Tuple2
+
+/**
+ * Defines logical time attributes for a [[TableSource]]. Time attributes can be used for
+ * indicating, accessing, and working with Flink's event-time or processing-time. A
+ * [[TableSource]] that implements this interface can define names and positions of rowtime
+ * and proctime attributes in the rows it produces.
+ */
+trait DefinedTimeAttributes {
+
+ /**
+ * Defines a name and position (starting at 0) of rowtime attribute that represents Flink's
+ * event-time. Null if no rowtime should be available. If the position is within the arity of
+ * the result row, the logical attribute will overwrite the physical attribute. If the position
+ * is higher than the result row, the time attribute will be appended logically.
+ */
+ def getRowtimeAttribute: Tuple2[Int, String]
+
+ /**
+ * Defines a name and position (starting at 0) of proctime attribute that represents Flink's
+ * processing-time. Null if no proctime should be available. If the position is within the arity
+ * of the result row, the logical attribute will overwrite the physical attribute. If the
+ * position is higher than the result row, the time attribute will be appended logically.
+ */
+ def getProctimeAttribute: Tuple2[Int, String]
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
new file mode 100644
index 0000000..31dcb5c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.typeutils
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.common.typeutils.TypeComparator
+import org.apache.flink.api.common.typeutils.base.{SqlTimestampComparator, SqlTimestampSerializer}
+
+/**
+ * Type information for indicating event or processing time. However, it behaves like a
+ * regular SQL timestamp.
+ */
+class TimeIndicatorTypeInfo(val isEventTime: Boolean)
+ extends SqlTimeTypeInfo[Timestamp](
+ classOf[Timestamp],
+ SqlTimestampSerializer.INSTANCE,
+ classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) {
+
+ override def toString: String = s"TimeIndicatorTypeInfo"
+}
+
+object TimeIndicatorTypeInfo {
+
+ val ROWTIME_INDICATOR = new TimeIndicatorTypeInfo(true)
+ val PROCTIME_INDICATOR = new TimeIndicatorTypeInfo(false)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 40f0cf2..9896a8c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.typeutils
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
import org.apache.flink.table.validate._
@@ -29,6 +29,7 @@ object TypeCheckUtils {
* SQL type but NOT vice versa.
*/
def isAdvanced(dataType: TypeInformation[_]): Boolean = dataType match {
+ case _: TimeIndicatorTypeInfo => false
case _: BasicTypeInfo[_] => false
case _: SqlTimeTypeInfo[_] => false
case _: TimeIntervalTypeInfo[_] => false
@@ -64,6 +65,8 @@ object TypeCheckUtils {
def isInteger(dataType: TypeInformation[_]): Boolean = dataType == INT_TYPE_INFO
+ def isLong(dataType: TypeInformation[_]): Boolean = dataType == LONG_TYPE_INFO
+
def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
case _ => false