You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/24 12:07:03 UTC
[1/3] flink git commit: [FLINK-7137] [table] Rework nullability
handling
Repository: flink
Updated Branches:
refs/heads/master bb118104b -> c0bad3b80
[FLINK-7137] [table] Rework nullability handling
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7aa11565
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7aa11565
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7aa11565
Branch: refs/heads/master
Commit: 7aa115658b23c19fbcc8e3d1d83113608ebd7ce7
Parents: 2d1e08a
Author: twalthr <tw...@apache.org>
Authored: Wed Jul 19 16:23:09 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 13:59:15 2017 +0200
----------------------------------------------------------------------
.../flink/table/calcite/FlinkTypeFactory.scala | 195 ++++++++++++-------
.../calcite/RelTimeIndicatorConverter.scala | 10 +-
.../flink/table/codegen/ExpressionReducer.scala | 12 +-
.../flink/table/expressions/aggregations.scala | 2 +-
.../apache/flink/table/expressions/array.scala | 2 +-
.../apache/flink/table/expressions/call.scala | 4 +-
.../apache/flink/table/expressions/cast.scala | 5 +-
.../flink/table/expressions/literals.scala | 2 +-
.../apache/flink/table/expressions/time.scala | 4 +-
.../table/functions/utils/AggSqlFunction.scala | 4 +-
.../functions/utils/ScalarSqlFunction.scala | 5 +-
.../logical/rel/LogicalWindowAggregate.scala | 2 +-
.../logical/FlinkLogicalWindowAggregate.scala | 2 +-
.../table/plan/schema/ArrayRelDataType.scala | 3 +-
.../plan/schema/CompositeRelDataType.scala | 13 +-
.../plan/schema/FlinkTableFunctionImpl.scala | 2 +-
.../table/plan/schema/GenericRelDataType.scala | 11 +-
.../plan/schema/TimeIndicatorRelDataType.scala | 6 +-
.../api/stream/table/GroupWindowTest.scala | 4 +-
.../plan/TimeIndicatorConversionTest.scala | 4 +-
.../runtime/batch/table/AggregateITCase.scala | 17 ++
21 files changed, 200 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index b63a3ad..6c23b9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -49,48 +49,53 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
// NOTE: for future data types it might be necessary to
// override more methods of RelDataTypeFactoryImpl
- private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
-
- def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType =
- createTypeFromTypeInfo(typeInfo, nullable = false)
-
- def createTypeFromTypeInfo(typeInfo: TypeInformation[_], nullable: Boolean): RelDataType = {
- // simple type can be converted to SQL types and vice versa
- if (isSimple(typeInfo)) {
- val sqlType = typeInfoToSqlTypeName(typeInfo)
- sqlType match {
-
- case INTERVAL_YEAR_MONTH =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
-
- case INTERVAL_DAY_SECOND =>
- createSqlIntervalType(
- new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
-
- case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
- if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
- createRowtimeIndicatorType()
- } else {
- createProctimeIndicatorType()
- }
-
- case _ =>
- createTypeWithNullability(createSqlType(sqlType), nullable)
+ private val seenTypes = mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]()
+
+ def createTypeFromTypeInfo(
+ typeInfo: TypeInformation[_],
+ isNullable: Boolean)
+ : RelDataType = {
+
+ // we cannot use seenTypes for simple types,
+ // because time indicators and timestamps would be the same
+
+ val relType = if (isSimple(typeInfo)) {
+ // simple types can be converted to SQL types and vice versa
+ val sqlType = typeInfoToSqlTypeName(typeInfo)
+ sqlType match {
+
+ case INTERVAL_YEAR_MONTH =>
+ createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+
+ case INTERVAL_DAY_SECOND =>
+ createSqlIntervalType(
+ new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+
+ case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+ if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+ createRowtimeIndicatorType()
+ } else {
+ createProctimeIndicatorType()
+ }
+
+ case _ =>
+ createSqlType(sqlType)
+ }
+ } else {
+ // advanced types require specific RelDataType
+ // for storing the original TypeInformation
+ seenTypes.getOrElseUpdate((typeInfo, isNullable), createAdvancedType(typeInfo, isNullable))
}
- }
- // advanced types require specific RelDataType
- // for storing the original TypeInformation
- else {
- seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
- }
+
+ createTypeWithNullability(relType, isNullable)
}
/**
* Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
*/
def createProctimeIndicatorType(): RelDataType = {
- val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
canonize(
new TimeIndicatorRelDataType(
getTypeSystem,
@@ -103,7 +108,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
* Creates a indicator type for event-time, but with similar properties as SQL timestamp.
*/
def createRowtimeIndicatorType(): RelDataType = {
- val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
canonize(
new TimeIndicatorRelDataType(
getTypeSystem,
@@ -113,6 +118,56 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
/**
+ * Creates types that create custom [[RelDataType]]s that wrap Flink's [[TypeInformation]].
+ */
+ private def createAdvancedType(
+ typeInfo: TypeInformation[_],
+ isNullable: Boolean): RelDataType = {
+
+ val relType = typeInfo match {
+
+ case ct: CompositeType[_] =>
+ new CompositeRelDataType(ct, isNullable, this)
+
+ case pa: PrimitiveArrayTypeInfo[_] =>
+ new ArrayRelDataType(
+ pa,
+ createTypeFromTypeInfo(pa.getComponentType, isNullable = false),
+ isNullable)
+
+ case ba: BasicArrayTypeInfo[_, _] =>
+ new ArrayRelDataType(
+ ba,
+ createTypeFromTypeInfo(ba.getComponentInfo, isNullable = true),
+ isNullable)
+
+ case oa: ObjectArrayTypeInfo[_, _] =>
+ new ArrayRelDataType(
+ oa,
+ createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true),
+ isNullable)
+
+ case mp: MapTypeInfo[_, _] =>
+ new MapRelDataType(
+ mp,
+ createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
+ createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
+ isNullable)
+
+ case ti: TypeInformation[_] =>
+ new GenericRelDataType(
+ ti,
+ isNullable,
+ getTypeSystem.asInstanceOf[FlinkTypeSystem])
+
+ case ti@_ =>
+ throw TableException(s"Unsupported type information: $ti")
+ }
+
+ canonize(relType)
+ }
+
+ /**
* Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
*
* @param fieldNames field names
@@ -153,13 +208,15 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
addedTimeAttributes += 1
} else {
val field = fields(i - addedTimeAttributes)
- logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2)).nullable(true)
+ logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2, isNullable = true))
}
}
logicalRowTypeBuilder.build
}
+ // ----------------------------------------------------------------------------------------------
+
override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
// it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
// always set those to default value
@@ -170,52 +227,48 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
}
- override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
- new ArrayRelDataType(
+ override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
+ val relType = new ArrayRelDataType(
ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
elementType,
- true)
+ isNullable = false)
- private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
- case ct: CompositeType[_] =>
- new CompositeRelDataType(ct, this)
+ canonize(relType)
+ }
- case pa: PrimitiveArrayTypeInfo[_] =>
- new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
+ override def createTypeWithNullability(
+ relDataType: RelDataType,
+ isNullable: Boolean): RelDataType = {
- case ba: BasicArrayTypeInfo[_, _] =>
- new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true)
+ // nullability change not necessary
+ if (relDataType.isNullable == isNullable) {
+ return canonize(relDataType)
+ }
- case oa: ObjectArrayTypeInfo[_, _] =>
- new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
+ // change nullability
+ val newType = relDataType match {
- case mp: MapTypeInfo[_, _] =>
- new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo),
- createTypeFromTypeInfo(mp.getValueTypeInfo), true)
+ case composite: CompositeRelDataType =>
+ new CompositeRelDataType(composite.compositeType, isNullable, this)
- case ti: TypeInformation[_] =>
- createTypeWithNullability(
- new GenericRelDataType(ti, getTypeSystem.asInstanceOf[FlinkTypeSystem]),
- nullable = true
- )
+ case array: ArrayRelDataType =>
+ new ArrayRelDataType(array.typeInfo, array.getComponentType, isNullable)
- case ti@_ =>
- throw TableException(s"Unsupported type information: $ti")
- }
+ case map: MapRelDataType =>
+ new MapRelDataType(map.typeInfo, map.keyType, map.valueType, isNullable)
+
+ case generic: GenericRelDataType =>
+ new GenericRelDataType(generic.typeInfo, isNullable, typeSystem)
+
+ case timeIndicator: TimeIndicatorRelDataType =>
+ timeIndicator
- override def createTypeWithNullability(
- relDataType: RelDataType,
- nullable: Boolean)
- : RelDataType = relDataType match {
- case composite: CompositeRelDataType =>
- // at the moment we do not care about nullability
- canonize(composite)
- case array: ArrayRelDataType =>
- val elementType = createTypeWithNullability(array.getComponentType, nullable)
- canonize(new ArrayRelDataType(array.typeInfo, elementType, nullable))
case _ =>
- super.createTypeWithNullability(relDataType, nullable)
+ super.createTypeWithNullability(relDataType, isNullable)
}
+
+ canonize(newType)
+ }
}
object FlinkTypeFactory {
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index d76613e..eb14291 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -42,9 +42,9 @@ import scala.collection.mutable
class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
private val timestamp = rexBuilder
- .getTypeFactory
- .asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ .getTypeFactory
+ .asInstanceOf[FlinkTypeFactory]
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
override def visit(intersect: LogicalIntersect): RelNode =
throw new TableException("Logical intersect in a stream environment is not supported yet.")
@@ -341,7 +341,7 @@ object RelTimeIndicatorConverter {
.getRexBuilder
.getTypeFactory
.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
// convert all time indicators types to timestamps
val fields = rootRel.getRowType.getFieldList.map { field =>
@@ -381,7 +381,7 @@ class RexTimeIndicatorMaterializer(
private val timestamp = rexBuilder
.getTypeFactory
.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+ .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
override def visitInputRef(inputRef: RexInputRef): RexNode = {
// reference is interesting
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index cf36417..3e71c99 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -53,15 +53,21 @@ class ExpressionReducer(config: TableConfig)
// we need to cast here for RexBuilder.makeLiteral
case (SqlTypeName.DATE, e) =>
Some(
- rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+ rexBuilder.makeCast(
+ typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+ e)
)
case (SqlTypeName.TIME, e) =>
Some(
- rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+ rexBuilder.makeCast(
+ typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+ e)
)
case (SqlTypeName.TIMESTAMP, e) =>
Some(
- rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e)
+ rexBuilder.makeCast(
+ typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, e.getType.isNullable),
+ e)
)
// we don't support object literals yet, we skip those constant expressions
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index a9901a3..c2d1bdf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -67,7 +67,7 @@ case class Sum(child: Expression) extends Aggregation {
override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
val returnType = relBuilder
.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(resultType)
+ .createTypeFromTypeInfo(resultType, isNullable = true)
new SqlSumAggFunction(returnType)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 7211733..3288478 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -38,7 +38,7 @@ case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
val relDataType = relBuilder
.asInstanceOf[FlinkRelBuilder]
.getTypeFactory
- .createTypeFromTypeInfo(resultType)
+ .createTypeFromTypeInfo(resultType, isNullable = false)
val values = elements.map(_.toRexNode).toList.asJava
relBuilder
.getRexBuilder
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index bd4fa2f..eb4b402 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -104,7 +104,7 @@ case class OverCall(
val operator: SqlAggFunction = agg.asInstanceOf[Aggregation].getSqlAggFunction()
val aggResultType = relBuilder
.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(agg.resultType)
+ .createTypeFromTypeInfo(agg.resultType, isNullable = true)
// assemble exprs by agg children
val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
@@ -151,7 +151,7 @@ case class OverCall(
case b: Literal =>
val returnType = relBuilder
.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- .createTypeFromTypeInfo(Types.DECIMAL)
+ .createTypeFromTypeInfo(Types.DECIMAL, isNullable = true)
val sqlOperator = new SqlPostfixOperator(
sqlKind.name,
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
index 312bf12..ba08ccb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -30,12 +30,13 @@ case class Cast(child: Expression, resultType: TypeInformation[_]) extends Unary
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ val childRexNode = child.toRexNode
relBuilder
.getRexBuilder
// we use abstract cast here because RelBuilder.cast() has to many side effects
.makeAbstractCast(
- typeFactory.createTypeFromTypeInfo(resultType),
- child.toRexNode)
+ typeFactory.createTypeFromTypeInfo(resultType, childRexNode.getType.isNullable),
+ childRexNode)
}
override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 053e7ed..eb9c4f5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -121,7 +121,7 @@ case class Null(resultType: TypeInformation[_]) extends LeafExpression {
val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
rexBuilder
.makeCast(
- typeFactory.createTypeFromTypeInfo(resultType),
+ typeFactory.createTypeFromTypeInfo(resultType, isNullable = true),
rexBuilder.constantNull())
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index 250ec0a..0a02666 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -103,7 +103,9 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
// TODO convert this into Table API expressions to make the code more readable
val rexBuilder = relBuilder.getRexBuilder
- val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
+ val resultType = relBuilder
+ .getTypeFactory()
+ .createTypeFromTypeInfo(LONG_TYPE_INFO, isNullable = true)
var result = rexBuilder.makeReinterpretCast(
resultType,
temporal,
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 4197760..b7d9991 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -97,7 +97,7 @@ object AggSqlFunction {
s"Operand types of ${signatureToString(operandTypeInfo)} could not be inferred."))
val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1))
- .map(typeFactory.createTypeFromTypeInfo)
+ .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
for (i <- operandTypes.indices) {
if (i < inferredTypes.length - 1) {
@@ -120,7 +120,7 @@ object AggSqlFunction {
new SqlReturnTypeInference {
override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
- typeFactory.createTypeFromTypeInfo(resultType)
+ typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index b1b45cd..0776f7a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -85,8 +85,7 @@ object ScalarSqlFunction {
s"Expected: ${signaturesToString(scalarFunction, "eval")}")
}
val resultType = getResultTypeOfScalarFunction(scalarFunction, foundSignature.get)
- val t = typeFactory.createTypeFromTypeInfo(resultType)
- typeFactory.createTypeWithNullability(t, nullable = true)
+ typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
}
}
}
@@ -111,7 +110,7 @@ object ScalarSqlFunction {
val inferredTypes = scalarFunction
.getParameterTypes(foundSignature)
- .map(typeFactory.createTypeFromTypeInfo)
+ .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
for (i <- operandTypes.indices) {
if (i < inferredTypes.length - 1) {
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
index 4443d6c..81f6bf0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
@@ -76,7 +76,7 @@ class LogicalWindowAggregate(
namedProperties.foreach { namedProp =>
builder.add(
namedProp.name,
- typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+ typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
)
}
builder.build()
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 8154738..3e605e8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -81,7 +81,7 @@ class FlinkLogicalWindowAggregate(
namedProperties.foreach { namedProp =>
builder.add(
namedProp.name,
- typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+ typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
)
}
builder.build()
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
index f7d9e1d..ed64c62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
@@ -42,7 +42,8 @@ class ArrayRelDataType(
case that: ArrayRelDataType =>
super.equals(that) &&
(that canEqual this) &&
- typeInfo == that.typeInfo
+ typeInfo == that.typeInfo &&
+ isNullable == that.isNullable
case _ => false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index 3694cc5..e0c6b6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -31,10 +31,12 @@ import scala.collection.JavaConverters._
* Composite type for encapsulating Flink's [[CompositeType]].
*
* @param compositeType CompositeType to encapsulate
+ * @param nullable flag if type can be nullable
* @param typeFactory Flink's type factory
*/
class CompositeRelDataType(
val compositeType: CompositeType[_],
+ val nullable: Boolean,
typeFactory: FlinkTypeFactory)
extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) {
@@ -46,7 +48,8 @@ class CompositeRelDataType(
case that: CompositeRelDataType =>
super.equals(that) &&
(that canEqual this) &&
- compositeType == that.compositeType
+ compositeType == that.compositeType &&
+ nullable == that.nullable
case _ => false
}
@@ -54,6 +57,8 @@ class CompositeRelDataType(
compositeType.hashCode()
}
+ override def isNullable: Boolean = nullable
+
}
object CompositeRelDataType {
@@ -73,11 +78,11 @@ object CompositeRelDataType {
new RelDataTypeFieldImpl(
name,
index,
- typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), nullable = true))
- .asInstanceOf[RelDataTypeField]
+ // TODO the composite type should provide the information if subtypes are nullable
+ typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), isNullable = true)
+ ).asInstanceOf[RelDataTypeField]
}
.toList
.asJava
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index 1c05883..27fc2ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -78,7 +78,7 @@ class FlinkTableFunctionImpl[T](
fieldNames
.zip(fieldTypes)
.foreach { f =>
- builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
+ builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, isNullable = true))
}
builder.build
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
index d93908b..84dd669 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
@@ -18,23 +18,27 @@
package org.apache.flink.table.plan.schema
+import org.apache.calcite.rel.`type`.RelDataTypeSystem
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.calcite.FlinkTypeSystem
/**
* Generic type for encapsulating Flink's [[TypeInformation]].
*
* @param typeInfo TypeInformation to encapsulate
+ * @param nullable flag if type can be nullable
* @param typeSystem Flink's type system
*/
class GenericRelDataType(
val typeInfo: TypeInformation[_],
- typeSystem: FlinkTypeSystem)
+ val nullable: Boolean,
+ typeSystem: RelDataTypeSystem)
extends BasicSqlType(
typeSystem,
SqlTypeName.ANY) {
+ isNullable = nullable
+
override def toString = s"ANY($typeInfo)"
def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
@@ -43,7 +47,8 @@ class GenericRelDataType(
case that: GenericRelDataType =>
super.equals(that) &&
(that canEqual this) &&
- typeInfo == that.typeInfo
+ typeInfo == that.typeInfo &&
+ nullable == that.nullable
case _ => false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
index 5e27061..ace881a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -38,12 +38,14 @@ class TimeIndicatorRelDataType(
case that: TimeIndicatorRelDataType =>
super.equals(that) &&
isEventTime == that.isEventTime
- case that: BasicSqlType =>
- super.equals(that)
case _ => false
}
override def hashCode(): Int = {
super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps
}
+
+ override def toString: String = {
+ s"TIME ATTRIBUTE(${if (isEventTime) "ROWTIME" else "PROCTIME"})"
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index a024460..599c76b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -772,8 +772,8 @@ class GroupWindowTest extends TableTestBase {
),
term("select",
"string",
- "+(CAST(TMP_0), 1) AS s1",
- "+(CAST(TMP_0), 3) AS s2",
+ "+(TMP_0, 1) AS s1",
+ "+(TMP_0, 3) AS s2",
"TMP_1 AS x",
"TMP_1 AS x2",
"TMP_2 AS x3",
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 90c8ea4..870025e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -161,8 +161,8 @@ class TimeIndicatorConversionTest extends TableTestBase {
term("invocation",
s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3), '')"),
term("function", func),
- term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
- "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
+ term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, " +
+ "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(2147483647) s)"),
term("joinType", "INNER")
),
term("select",
http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
index 457142c..d563f96 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
@@ -42,6 +42,23 @@ class AggregationsITCase(
extends TableProgramsCollectionTestBase(configMode) {
@Test
+ def testAggregationWithCaseClass(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val inputTable = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b)
+ tEnv.registerDataSet("MyTable", inputTable)
+
+ val result = tEnv.scan("MyTable")
+ .where('a.get("_1") > 0)
+ .select('a.get("_1").avg, 'a.get("_2").sum, 'b.count)
+
+ val expected = "2,6,3"
+ val results = result.toDataSet[Row].collect()
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
+ @Test
def testAggregationTypes(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
[2/3] flink git commit: [FLINK-7137] [table] Fix nullability for
nested types
Posted by tw...@apache.org.
[FLINK-7137] [table] Fix nullability for nested types
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1e08a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1e08a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1e08a0
Branch: refs/heads/master
Commit: 2d1e08a02d84f8d7cb2734e09741eae72bf63b7d
Parents: bb11810
Author: Rong Rong <ro...@uber.com>
Authored: Wed Jul 12 21:29:16 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 13:59:15 2017 +0200
----------------------------------------------------------------------
.../apache/flink/table/calcite/FlinkTypeFactory.scala | 12 +++++++++---
.../flink/table/plan/schema/CompositeRelDataType.scala | 2 +-
.../flink/table/expressions/ScalarOperatorsTest.scala | 11 +++++++++++
.../expressions/utils/ScalarOperatorsTestBase.scala | 6 ++++--
4 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index eba1623..b63a3ad 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -51,7 +51,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
- def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
+ def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType =
+ createTypeFromTypeInfo(typeInfo, nullable = false)
+
+ def createTypeFromTypeInfo(typeInfo: TypeInformation[_], nullable: Boolean): RelDataType = {
// simple type can be converted to SQL types and vice versa
if (isSimple(typeInfo)) {
val sqlType = typeInfoToSqlTypeName(typeInfo)
@@ -73,7 +76,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
}
case _ =>
- createSqlType(sqlType)
+ createTypeWithNullability(createSqlType(sqlType), nullable)
}
}
// advanced types require specific RelDataType
@@ -191,7 +194,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createTypeFromTypeInfo(mp.getValueTypeInfo), true)
case ti: TypeInformation[_] =>
- new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
+ createTypeWithNullability(
+ new GenericRelDataType(ti, getTypeSystem.asInstanceOf[FlinkTypeSystem]),
+ nullable = true
+ )
case ti@_ =>
throw TableException(s"Unsupported type information: $ti")
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index a60514b..3694cc5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -73,7 +73,7 @@ object CompositeRelDataType {
new RelDataTypeFieldImpl(
name,
index,
- typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
+ typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), nullable = true))
.asInstanceOf[RelDataTypeField]
}
.toList
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 738413e..6cb9fa8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -138,6 +138,17 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
@Test
def testOtherExpressions(): Unit = {
+
+ // nested field null type
+ testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
+ testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
+ testAllApis('f13.isNull, "f13.isNull", "f13 IS NULL", "false")
+ testAllApis('f13.isNotNull, "f13.isNotNull", "f13 IS NOT NULL", "true")
+ testAllApis('f13.get("f0").isNull, "f13.get('f0').isNull", "f13.f0 IS NULL", "false")
+ testAllApis('f13.get("f0").isNotNull, "f13.get('f0').isNotNull", "f13.f0 IS NOT NULL", "true")
+ testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
+ testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
+
// boolean literals
testAllApis(
true,
http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 2d22843..b719390 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
class ScalarOperatorsTestBase extends ExpressionTestBase {
def testData: Row = {
- val testData = new Row(13)
+ val testData = new Row(14)
testData.setField(0, 1: Byte)
testData.setField(1, 1: Short)
testData.setField(2, 1)
@@ -41,6 +41,7 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
testData.setField(10, "String")
testData.setField(11, false)
testData.setField(12, null)
+ testData.setField(13, Row.of("foo", null))
testData
}
@@ -58,7 +59,8 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
Types.INT,
Types.STRING,
Types.BOOLEAN,
- Types.BOOLEAN
+ Types.BOOLEAN,
+ Types.ROW(Types.STRING, Types.STRING)
).asInstanceOf[TypeInformation[Any]]
}
[3/3] flink git commit: [FLINK-7137] [table] Add unit tests to verify
composite type nullable and nested field nullable on aggregations
Posted by tw...@apache.org.
[FLINK-7137] [table] Add unit tests to verify composite type nullable and nested field nullable on aggregations
This closes #4314.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0bad3b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0bad3b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0bad3b8
Branch: refs/heads/master
Commit: c0bad3b80d6fe67e43bc1a5d3bebbd98479e3d76
Parents: 7aa1156
Author: Rong Rong <ro...@uber.com>
Authored: Wed Jul 19 13:26:31 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 14:06:00 2017 +0200
----------------------------------------------------------------------
.../table/api/batch/sql/AggregateTest.scala | 40 +++++++++++++++++++
.../table/api/batch/table/AggregateTest.scala | 42 ++++++++++++++++++++
2 files changed, 82 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0bad3b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
index 7ae7db6..445fed3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
@@ -98,6 +98,46 @@ class AggregateTest extends TableTestBase {
}
@Test
+ def testAggregateWithFilterOnNestedFields(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, (Int, Long))]("MyTable", 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT avg(a), sum(b), count(c), sum(c._1) FROM MyTable WHERE a = 1"
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "CAST(1) AS a", "b", "c", "c._1 AS $f3"),
+ term("where", "=(a, 1)")
+ )
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ calcNode,
+ tuples(List(null,null,null,null)),
+ term("values","a","b","c","$f3")
+ )
+
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c","$f3")
+ )
+
+ val aggregate = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS EXPR$0",
+ "SUM(b) AS EXPR$1",
+ "COUNT(c) AS EXPR$2",
+ "SUM($f3) AS EXPR$3")
+ )
+
+ util.verifySql(sqlQuery, aggregate)
+ }
+
+ @Test
def testGroupAggregate(): Unit = {
val util = batchTestUtil()
util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
http://git-wip-us.apache.org/repos/asf/flink/blob/c0bad3b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
index af001be..0a135d1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
@@ -130,4 +130,46 @@ class AggregateTest extends TableTestBase {
util.verifyTable(resultTable, expected)
}
+
+ @Test
+ def testAggregateWithFilterOnNestedFields(): Unit = {
+ val util = batchTestUtil()
+ val sourceTable = util.addTable[(Int, Long, (Int, Long))]("MyTable", 'a, 'b, 'c)
+
+ val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+ .select('a.avg,'b.sum,'c.count, 'c.get("_1").sum)
+
+ val calcNode = unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ // ReduceExpressionsRule will add cast for Project node by force
+ // if the input of the Project node has constant expression.
+ term("select", "CAST(1) AS a", "b", "c", "c._1 AS $f3"),
+ term("where", "=(a, 1)")
+ )
+
+ val setValues = unaryNode(
+ "DataSetValues",
+ calcNode,
+ tuples(List(null,null,null,null)),
+ term("values","a","b","c","$f3")
+ )
+
+ val union = unaryNode(
+ "DataSetUnion",
+ setValues,
+ term("union","a","b","c","$f3")
+ )
+
+ val expected = unaryNode(
+ "DataSetAggregate",
+ union,
+ term("select",
+ "AVG(a) AS TMP_0",
+ "SUM(b) AS TMP_1",
+ "COUNT(c) AS TMP_2",
+ "SUM($f3) AS TMP_3")
+ )
+ util.verifyTable(resultTable, expected)
+ }
}