You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/24 12:31:09 UTC
[1/2] flink git commit: [FLINK-7137] [table] Rework nullability
handling
Repository: flink
Updated Branches:
refs/heads/release-1.3 45a5f875b -> 8c87c4469
[FLINK-7137] [table] Rework nullability handling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be8ca8a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be8ca8a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be8ca8a3
Branch: refs/heads/release-1.3
Commit: be8ca8a384604a2fb2bd74886f452e4a61ce9cfb
Parents: 45a5f87
Author: twalthr <tw...@apache.org>
Authored: Wed Jul 19 16:23:09 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 14:14:41 2017 +0200
----------------------------------------------------------------------
.../flink/table/calcite/FlinkTypeFactory.scala | 189 ++++++++++++-------
.../calcite/RelTimeIndicatorConverter.scala | 4 +-
.../flink/table/codegen/ExpressionReducer.scala | 12 +-
.../flink/table/expressions/aggregations.scala | 2 +-
.../apache/flink/table/expressions/array.scala | 2 +-
.../apache/flink/table/expressions/call.scala | 4 +-
.../apache/flink/table/expressions/cast.scala | 5 +-
.../flink/table/expressions/literals.scala | 2 +-
.../apache/flink/table/expressions/time.scala | 4 +-
.../table/functions/utils/AggSqlFunction.scala | 4 +-
.../functions/utils/ScalarSqlFunction.scala | 5 +-
.../logical/rel/LogicalWindowAggregate.scala | 2 +-
.../logical/FlinkLogicalWindowAggregate.scala | 2 +-
.../table/plan/schema/ArrayRelDataType.scala | 3 +-
.../plan/schema/CompositeRelDataType.scala | 13 +-
.../plan/schema/FlinkTableFunctionImpl.scala | 2 +-
.../table/plan/schema/GenericRelDataType.scala | 11 +-
.../plan/schema/TimeIndicatorRelDataType.scala | 6 +-
.../scala/batch/table/AggregationsITCase.scala | 17 ++
.../scala/stream/table/GroupWindowTest.scala | 4 +-
.../calcite/RelTimeIndicatorConverterTest.scala | 4 +-
21 files changed, 197 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index eba1623..6c23b9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -49,45 +49,53 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
// NOTE: for future data types it might be necessary to
// override more methods of RelDataTypeFactoryImpl
- private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
-
- def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
- // simple type can be converted to SQL types and vice versa
- if (isSimple(typeInfo)) {
- val sqlType = typeInfoToSqlTypeName(typeInfo)
- sqlType match {
-
- case INTERVAL_YEAR_MONTH =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
-
- case INTERVAL_DAY_SECOND =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
-
- case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
- if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
- createRowtimeIndicatorType()
- } else {
- createProctimeIndicatorType()
- }
-
- case _ =>
- createSqlType(sqlType)
+ private val seenTypes = mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]()
+
+ def createTypeFromTypeInfo(
+ typeInfo: TypeInformation[_],
+ isNullable: Boolean)
+ : RelDataType = {
+
+ // we cannot use seenTypes for simple types,
+ // because time indicators and timestamps would be the same
+
+ val relType = if (isSimple(typeInfo)) {
+ // simple types can be converted to SQL types and vice versa
+ val sqlType = typeInfoToSqlTypeName(typeInfo)
+ sqlType match {
+
+ case INTERVAL_YEAR_MONTH =>
+ createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+
+ case INTERVAL_DAY_SECOND =>
+ createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+
+ case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+ if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+ createRowtimeIndicatorType()
+ } else {
+ createProctimeIndicatorType()
+ }
+
+ case _ =>
+ createSqlType(sqlType)
+ }
+ } else {
+ // advanced types require specific RelDataType
+ // for storing the original TypeInformation
+ seenTypes.getOrElseUpdate((typeInfo, isNullable), createAdvancedType(typeInfo, isNullable))
}
- }
- // advanced types require specific RelDataType
- // for storing the original TypeInformation
- else {
- seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
- }
+
+ createTypeWithNullability(relType, isNullable)
}
/**
* Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
*/
def createProctimeIndicatorType(): RelDataType = {
- val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
canonize(
new TimeIndicatorRelDataType(
getTypeSystem,
@@ -100,7 +108,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
* Creates a indicator type for event-time, but with similar properties as SQL timestamp.
*/
def createRowtimeIndicatorType(): RelDataType = {
- val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
canonize(
new TimeIndicatorRelDataType(
getTypeSystem,
@@ -110,6 +118,56 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
/**
+ * Creates types that create custom [[RelDataType]]s that wrap Flink's [[TypeInformation]].
+ */
+ private def createAdvancedType(
+ typeInfo: TypeInformation[_],
+ isNullable: Boolean): RelDataType = {
+
+ val relType = typeInfo match {
+
+ case ct: CompositeType[_] =>
+ new CompositeRelDataType(ct, isNullable, this)
+
+ case pa: PrimitiveArrayTypeInfo[_] =>
+ new ArrayRelDataType(
+ pa,
+ createTypeFromTypeInfo(pa.getComponentType, isNullable = false),
+ isNullable)
+
+ case ba: BasicArrayTypeInfo[_, _] =>
+ new ArrayRelDataType(
+ ba,
+ createTypeFromTypeInfo(ba.getComponentInfo, isNullable = true),
+ isNullable)
+
+ case oa: ObjectArrayTypeInfo[_, _] =>
+ new ArrayRelDataType(
+ oa,
+ createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true),
+ isNullable)
+
+ case mp: MapTypeInfo[_, _] =>
+ new MapRelDataType(
+ mp,
+ createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
+ createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
+ isNullable)
+
+ case ti: TypeInformation[_] =>
+ new GenericRelDataType(
+ ti,
+ isNullable,
+ getTypeSystem.asInstanceOf[FlinkTypeSystem])
+
+ case ti@_ =>
+ throw TableException(s"Unsupported type information: $ti")
+ }
+
+ canonize(relType)
+ }
+
+ /**
* Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
*
* @param fieldNames field names
@@ -150,13 +208,15 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
addedTimeAttributes += 1
} else {
val field = fields(i - addedTimeAttributes)
- logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2)).nullable(true)
+ logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2, isNullable = true))
}
}
logicalRowTypeBuilder.build
}
+ // ----------------------------------------------------------------------------------------------
+
override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
// it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
// always set those to default value
@@ -167,49 +227,48 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
}
- override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
- new ArrayRelDataType(
+ override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
+ val relType = new ArrayRelDataType(
ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
elementType,
- true)
+ isNullable = false)
- private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
- case ct: CompositeType[_] =>
- new CompositeRelDataType(ct, this)
+ canonize(relType)
+ }
- case pa: PrimitiveArrayTypeInfo[_] =>
- new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
+ override def createTypeWithNullability(
+ relDataType: RelDataType,
+ isNullable: Boolean): RelDataType = {
- case ba: BasicArrayTypeInfo[_, _] =>
- new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true)
+ // nullability change not necessary
+ if (relDataType.isNullable == isNullable) {
+ return canonize(relDataType)
+ }
- case oa: ObjectArrayTypeInfo[_, _] =>
- new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
+ // change nullability
+ val newType = relDataType match {
- case mp: MapTypeInfo[_, _] =>
- new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo),
- createTypeFromTypeInfo(mp.getValueTypeInfo), true)
+ case composite: CompositeRelDataType =>
+ new CompositeRelDataType(composite.compositeType, isNullable, this)
- case ti: TypeInformation[_] =>
- new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
+ case array: ArrayRelDataType =>
+ new ArrayRelDataType(array.typeInfo, array.getComponentType, isNullable)
- case ti@_ =>
- throw TableException(s"Unsupported type information: $ti")
- }
+ case map: MapRelDataType =>
+ new MapRelDataType(map.typeInfo, map.keyType, map.valueType, isNullable)
+
+ case generic: GenericRelDataType =>
+ new GenericRelDataType(generic.typeInfo, isNullable, typeSystem)
+
+ case timeIndicator: TimeIndicatorRelDataType =>
+ timeIndicator
- override def createTypeWithNullability(
- relDataType: RelDataType,
- nullable: Boolean)
- : RelDataType = relDataType match {
- case composite: CompositeRelDataType =>
- // at the moment we do not care about nullability
- canonize(composite)
- case array: ArrayRelDataType =>
- val elementType = createTypeWithNullability(array.getComponentType, nullable)
- canonize(new ArrayRelDataType(array.typeInfo, elementType, nullable))
case _ =>
- super.createTypeWithNullability(relDataType, nullable)
+ super.createTypeWithNullability(relDataType, isNullable)
}
+
+ canonize(newType)
+ }
}
object FlinkTypeFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index b28e3f8..ac4dc13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -44,7 +44,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
private val timestamp = rexBuilder
.getTypeFactory
.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
override def visit(intersect: LogicalIntersect): RelNode =
throw new TableException("Logical intersect in a stream environment is not supported yet.")
@@ -399,7 +399,7 @@ object RelTimeIndicatorConverter {
.getRexBuilder
.getTypeFactory
.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
// convert all time indicators types to timestamps
val fields = rootRel.getRowType.getFieldList.map { field =>
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index b7e1335..3241b5e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -53,15 +53,21 @@ class ExpressionReducer(config: TableConfig)
// we need to cast here for RexBuilder.makeLiteral
case (SqlTypeName.DATE, e) =>
Some(
- rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+ rexBuilder.makeCast(
+ typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+ e)
)
case (SqlTypeName.TIME, e) =>
Some(
- rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+ rexBuilder.makeCast(
+ typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+ e)
)
case (SqlTypeName.TIMESTAMP, e) =>
Some(
- rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e)
+ rexBuilder.makeCast(
+ typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, e.getType.isNullable),
+ e)
)
// we don't support object literals yet, we skip those constant expressions
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index 6d906b9..d38880c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -67,7 +67,7 @@ case class Sum(child: Expression) extends Aggregation {
override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
val returnType = relBuilder
.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(resultType)
+ .createTypeFromTypeInfo(resultType, isNullable = true)
new SqlSumAggFunction(returnType)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 7211733..3288478 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -38,7 +38,7 @@ case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
val relDataType = relBuilder
.asInstanceOf[FlinkRelBuilder]
.getTypeFactory
- .createTypeFromTypeInfo(resultType)
+ .createTypeFromTypeInfo(resultType, isNullable = false)
val values = elements.map(_.toRexNode).toList.asJava
relBuilder
.getRexBuilder
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index 13f8a11..5d1a60e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -104,7 +104,7 @@ case class OverCall(
val operator: SqlAggFunction = agg.asInstanceOf[Aggregation].getSqlAggFunction()
val aggResultType = relBuilder
.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(agg.resultType)
+ .createTypeFromTypeInfo(agg.resultType, isNullable = true)
// assemble exprs by agg children
val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
@@ -151,7 +151,7 @@ case class OverCall(
case b: Literal =>
val returnType = relBuilder
.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(Types.DECIMAL)
+ .createTypeFromTypeInfo(Types.DECIMAL, isNullable = true)
val sqlOperator = new SqlPostfixOperator(
sqlKind.name,
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
index 312bf12..ba08ccb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -30,12 +30,13 @@ case class Cast(child: Expression, resultType: TypeInformation[_]) extends Unary
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ val childRexNode = child.toRexNode
relBuilder
.getRexBuilder
// we use abstract cast here because RelBuilder.cast() has to many side effects
.makeAbstractCast(
- typeFactory.createTypeFromTypeInfo(resultType),
- child.toRexNode)
+ typeFactory.createTypeFromTypeInfo(resultType, childRexNode.getType.isNullable),
+ childRexNode)
}
override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 053e7ed..eb9c4f5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -121,7 +121,7 @@ case class Null(resultType: TypeInformation[_]) extends LeafExpression {
val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
rexBuilder
.makeCast(
- typeFactory.createTypeFromTypeInfo(resultType),
+ typeFactory.createTypeFromTypeInfo(resultType, isNullable = true),
rexBuilder.constantNull())
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index f09e2ad..98e3030 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -101,7 +101,9 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
// TODO convert this into Table API expressions to make the code more readable
val rexBuilder = relBuilder.getRexBuilder
- val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
+ val resultType = relBuilder
+ .getTypeFactory()
+ .createTypeFromTypeInfo(LONG_TYPE_INFO, isNullable = true)
var result = rexBuilder.makeReinterpretCast(
resultType,
temporal,
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 816bc52..4b22de7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -92,7 +92,7 @@ object AggSqlFunction {
s"Operand types of ${signatureToString(operandTypeInfo)} could not be inferred."))
val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1))
- .map(typeFactory.createTypeFromTypeInfo)
+ .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
for (i <- operandTypes.indices) {
if (i < inferredTypes.length - 1) {
@@ -115,7 +115,7 @@ object AggSqlFunction {
new SqlReturnTypeInference {
override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
- typeFactory.createTypeFromTypeInfo(resultType)
+ typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index bbfa3aa..8f71d72 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -84,8 +84,7 @@ object ScalarSqlFunction {
s"Expected: ${signaturesToString(scalarFunction, "eval")}")
}
val resultType = getResultTypeOfScalarFunction(scalarFunction, foundSignature.get)
- val t = typeFactory.createTypeFromTypeInfo(resultType)
- typeFactory.createTypeWithNullability(t, nullable = true)
+ typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
}
}
}
@@ -110,7 +109,7 @@ object ScalarSqlFunction {
val inferredTypes = scalarFunction
.getParameterTypes(foundSignature)
- .map(typeFactory.createTypeFromTypeInfo)
+ .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
for (i <- operandTypes.indices) {
if (i < inferredTypes.length - 1) {
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
index 4443d6c..81f6bf0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
@@ -76,7 +76,7 @@ class LogicalWindowAggregate(
namedProperties.foreach { namedProp =>
builder.add(
namedProp.name,
- typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+ typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
)
}
builder.build()
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 8154738..3e605e8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -81,7 +81,7 @@ class FlinkLogicalWindowAggregate(
namedProperties.foreach { namedProp =>
builder.add(
namedProp.name,
- typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+ typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
)
}
builder.build()
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
index f7d9e1d..ed64c62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
@@ -42,7 +42,8 @@ class ArrayRelDataType(
case that: ArrayRelDataType =>
super.equals(that) &&
(that canEqual this) &&
- typeInfo == that.typeInfo
+ typeInfo == that.typeInfo &&
+ isNullable == that.isNullable
case _ => false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index a60514b..e0c6b6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -31,10 +31,12 @@ import scala.collection.JavaConverters._
* Composite type for encapsulating Flink's [[CompositeType]].
*
* @param compositeType CompositeType to encapsulate
+ * @param nullable flag if type can be nullable
* @param typeFactory Flink's type factory
*/
class CompositeRelDataType(
val compositeType: CompositeType[_],
+ val nullable: Boolean,
typeFactory: FlinkTypeFactory)
extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) {
@@ -46,7 +48,8 @@ class CompositeRelDataType(
case that: CompositeRelDataType =>
super.equals(that) &&
(that canEqual this) &&
- compositeType == that.compositeType
+ compositeType == that.compositeType &&
+ nullable == that.nullable
case _ => false
}
@@ -54,6 +57,8 @@ class CompositeRelDataType(
compositeType.hashCode()
}
+ override def isNullable: Boolean = nullable
+
}
object CompositeRelDataType {
@@ -73,11 +78,11 @@ object CompositeRelDataType {
new RelDataTypeFieldImpl(
name,
index,
- typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
- .asInstanceOf[RelDataTypeField]
+ // TODO the composite type should provide the information if subtypes are nullable
+ typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), isNullable = true)
+ ).asInstanceOf[RelDataTypeField]
}
.toList
.asJava
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index 1c05883..27fc2ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -78,7 +78,7 @@ class FlinkTableFunctionImpl[T](
fieldNames
.zip(fieldTypes)
.foreach { f =>
- builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
+ builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, isNullable = true))
}
builder.build
}
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
index d93908b..84dd669 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
@@ -18,23 +18,27 @@
package org.apache.flink.table.plan.schema
+import org.apache.calcite.rel.`type`.RelDataTypeSystem
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.calcite.FlinkTypeSystem
/**
* Generic type for encapsulating Flink's [[TypeInformation]].
*
* @param typeInfo TypeInformation to encapsulate
+ * @param nullable flag if type can be nullable
* @param typeSystem Flink's type system
*/
class GenericRelDataType(
val typeInfo: TypeInformation[_],
- typeSystem: FlinkTypeSystem)
+ val nullable: Boolean,
+ typeSystem: RelDataTypeSystem)
extends BasicSqlType(
typeSystem,
SqlTypeName.ANY) {
+ isNullable = nullable
+
override def toString = s"ANY($typeInfo)"
def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
@@ -43,7 +47,8 @@ class GenericRelDataType(
case that: GenericRelDataType =>
super.equals(that) &&
(that canEqual this) &&
- typeInfo == that.typeInfo
+ typeInfo == that.typeInfo &&
+ nullable == that.nullable
case _ => false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
index 5e27061..ace881a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -38,12 +38,14 @@ class TimeIndicatorRelDataType(
case that: TimeIndicatorRelDataType =>
super.equals(that) &&
isEventTime == that.isEventTime
- case that: BasicSqlType =>
- super.equals(that)
case _ => false
}
override def hashCode(): Int = {
super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps
}
+
+ override def toString: String = {
+ s"TIME ATTRIBUTE(${if (isEventTime) "ROWTIME" else "PROCTIME"})"
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
index 45abf09..dd0f487 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
@@ -42,6 +42,23 @@ class AggregationsITCase(
extends TableProgramsCollectionTestBase(configMode) {
@Test
+ def testAggregationWithCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val inputTable = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b)
+ tEnv.registerDataSet("MyTable", inputTable)
+
+ val result = tEnv.scan("MyTable")
+ .where('a.get("_1") > 0)
+ .select('a.get("_1").avg, 'a.get("_2").sum, 'b.count)
+
+ val expected = "2,6,3"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
def testAggregationTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index 593b036..bb2a498 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -950,8 +950,8 @@ class GroupWindowTest extends TableTestBase {
),
term("select",
"string",
- "+(CAST(TMP_0), 1) AS s1",
- "+(CAST(TMP_0), 3) AS s2",
+ "+(TMP_0, 1) AS s1",
+ "+(TMP_0, 3) AS s2",
"TMP_1 AS x",
"TMP_1 AS x2",
"TMP_2 AS x3",
http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
index c315d94..8c004df 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -211,8 +211,8 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
term("invocation",
s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3), '')"),
term("function", func),
- term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
- "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
+ term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, " +
+ "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(2147483647) s)"),
term("joinType", "INNER")
),
term("select",
[2/2] flink git commit: [FLINK-7137] [table] Backport of tests for
Flink 1.3
Posted by tw...@apache.org.
[FLINK-7137] [table] Backport of tests for Flink 1.3
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c87c446
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c87c446
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c87c446
Branch: refs/heads/release-1.3
Commit: 8c87c44692bc27fb8018adf587715a9488947799
Parents: be8ca8a
Author: twalthr <tw...@apache.org>
Authored: Mon Jul 24 14:27:07 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 14:27:07 2017 +0200
----------------------------------------------------------------------
.../table/expressions/ScalarOperatorsTest.scala | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8c87c446/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 0639835..5a82a8f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -138,6 +138,17 @@ class ScalarOperatorsTest extends ExpressionTestBase {
@Test
def testOtherExpressions(): Unit = {
+
+ // nested field null type
+ testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
+ testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
+ testAllApis('f13.isNull, "f13.isNull", "f13 IS NULL", "false")
+ testAllApis('f13.isNotNull, "f13.isNotNull", "f13 IS NOT NULL", "true")
+ testAllApis('f13.get("f0").isNull, "f13.get('f0').isNull", "f13.f0 IS NULL", "false")
+ testAllApis('f13.get("f0").isNotNull, "f13.get('f0').isNotNull", "f13.f0 IS NOT NULL", "true")
+ testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
+ testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
+
// boolean literals
testAllApis(
true,
@@ -239,7 +250,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
def testData = {
- val testData = new Row(13)
+ val testData = new Row(14)
testData.setField(0, 1: Byte)
testData.setField(1, 1: Short)
testData.setField(2, 1)
@@ -253,6 +264,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
testData.setField(10, "String")
testData.setField(11, false)
testData.setField(12, null)
+ testData.setField(13, Row.of("foo", null))
testData
}
@@ -270,7 +282,8 @@ class ScalarOperatorsTest extends ExpressionTestBase {
Types.INT,
Types.STRING,
Types.BOOLEAN,
- Types.BOOLEAN
+ Types.BOOLEAN,
+ Types.ROW(Types.STRING, Types.STRING)
).asInstanceOf[TypeInformation[Any]]
}