You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/24 12:07:03 UTC

[1/3] flink git commit: [FLINK-7137] [table] Rework nullability handling

Repository: flink
Updated Branches:
  refs/heads/master bb118104b -> c0bad3b80


[FLINK-7137] [table] Rework nullability handling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7aa11565
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7aa11565
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7aa11565

Branch: refs/heads/master
Commit: 7aa115658b23c19fbcc8e3d1d83113608ebd7ce7
Parents: 2d1e08a
Author: twalthr <tw...@apache.org>
Authored: Wed Jul 19 16:23:09 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 13:59:15 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 195 ++++++++++++-------
 .../calcite/RelTimeIndicatorConverter.scala     |  10 +-
 .../flink/table/codegen/ExpressionReducer.scala |  12 +-
 .../flink/table/expressions/aggregations.scala  |   2 +-
 .../apache/flink/table/expressions/array.scala  |   2 +-
 .../apache/flink/table/expressions/call.scala   |   4 +-
 .../apache/flink/table/expressions/cast.scala   |   5 +-
 .../flink/table/expressions/literals.scala      |   2 +-
 .../apache/flink/table/expressions/time.scala   |   4 +-
 .../table/functions/utils/AggSqlFunction.scala  |   4 +-
 .../functions/utils/ScalarSqlFunction.scala     |   5 +-
 .../logical/rel/LogicalWindowAggregate.scala    |   2 +-
 .../logical/FlinkLogicalWindowAggregate.scala   |   2 +-
 .../table/plan/schema/ArrayRelDataType.scala    |   3 +-
 .../plan/schema/CompositeRelDataType.scala      |  13 +-
 .../plan/schema/FlinkTableFunctionImpl.scala    |   2 +-
 .../table/plan/schema/GenericRelDataType.scala  |  11 +-
 .../plan/schema/TimeIndicatorRelDataType.scala  |   6 +-
 .../api/stream/table/GroupWindowTest.scala      |   4 +-
 .../plan/TimeIndicatorConversionTest.scala      |   4 +-
 .../runtime/batch/table/AggregateITCase.scala   |  17 ++
 21 files changed, 200 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index b63a3ad..6c23b9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -49,48 +49,53 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   // NOTE: for future data types it might be necessary to
   // override more methods of RelDataTypeFactoryImpl
 
-  private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
-
-  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType =
-    createTypeFromTypeInfo(typeInfo, nullable = false)
-
-  def createTypeFromTypeInfo(typeInfo: TypeInformation[_], nullable: Boolean): RelDataType = {
-    // simple type can be converted to SQL types and vice versa
-    if (isSimple(typeInfo)) {
-      val sqlType = typeInfoToSqlTypeName(typeInfo)
-      sqlType match {
-
-        case INTERVAL_YEAR_MONTH =>
-          createSqlIntervalType(
-            new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
-
-        case INTERVAL_DAY_SECOND =>
-          createSqlIntervalType(
-            new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
-
-        case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
-          if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
-            createRowtimeIndicatorType()
-          } else {
-            createProctimeIndicatorType()
-          }
-
-        case _ =>
-          createTypeWithNullability(createSqlType(sqlType), nullable)
+  private val seenTypes = mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]()
+
+  def createTypeFromTypeInfo(
+      typeInfo: TypeInformation[_],
+      isNullable: Boolean)
+    : RelDataType = {
+
+      // we cannot use seenTypes for simple types,
+      // because time indicators and timestamps would be the same
+
+      val relType = if (isSimple(typeInfo)) {
+        // simple types can be converted to SQL types and vice versa
+        val sqlType = typeInfoToSqlTypeName(typeInfo)
+        sqlType match {
+
+          case INTERVAL_YEAR_MONTH =>
+            createSqlIntervalType(
+              new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+
+          case INTERVAL_DAY_SECOND =>
+            createSqlIntervalType(
+              new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+
+          case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+            if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+              createRowtimeIndicatorType()
+            } else {
+              createProctimeIndicatorType()
+            }
+
+          case _ =>
+            createSqlType(sqlType)
+        }
+      } else {
+        // advanced types require specific RelDataType
+        // for storing the original TypeInformation
+        seenTypes.getOrElseUpdate((typeInfo, isNullable), createAdvancedType(typeInfo, isNullable))
       }
-    }
-    // advanced types require specific RelDataType
-    // for storing the original TypeInformation
-    else {
-      seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
-    }
+
+    createTypeWithNullability(relType, isNullable)
   }
 
   /**
     * Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
     */
   def createProctimeIndicatorType(): RelDataType = {
-    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
     canonize(
       new TimeIndicatorRelDataType(
         getTypeSystem,
@@ -103,7 +108,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     * Creates a indicator type for event-time, but with similar properties as SQL timestamp.
     */
   def createRowtimeIndicatorType(): RelDataType = {
-    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
     canonize(
       new TimeIndicatorRelDataType(
         getTypeSystem,
@@ -113,6 +118,56 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   }
 
   /**
+    * Creates types that create custom [[RelDataType]]s that wrap Flink's [[TypeInformation]].
+    */
+  private def createAdvancedType(
+      typeInfo: TypeInformation[_],
+      isNullable: Boolean): RelDataType = {
+
+    val relType = typeInfo match {
+
+      case ct: CompositeType[_] =>
+        new CompositeRelDataType(ct, isNullable, this)
+
+      case pa: PrimitiveArrayTypeInfo[_] =>
+        new ArrayRelDataType(
+          pa,
+          createTypeFromTypeInfo(pa.getComponentType, isNullable = false),
+          isNullable)
+
+      case ba: BasicArrayTypeInfo[_, _] =>
+        new ArrayRelDataType(
+          ba,
+          createTypeFromTypeInfo(ba.getComponentInfo, isNullable = true),
+          isNullable)
+
+      case oa: ObjectArrayTypeInfo[_, _] =>
+        new ArrayRelDataType(
+          oa,
+          createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true),
+          isNullable)
+
+      case mp: MapTypeInfo[_, _] =>
+        new MapRelDataType(
+          mp,
+          createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
+          createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
+          isNullable)
+
+      case ti: TypeInformation[_] =>
+        new GenericRelDataType(
+          ti,
+          isNullable,
+          getTypeSystem.asInstanceOf[FlinkTypeSystem])
+
+      case ti@_ =>
+        throw TableException(s"Unsupported type information: $ti")
+    }
+
+    canonize(relType)
+  }
+
+  /**
     * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
     *
     * @param fieldNames field names
@@ -153,13 +208,15 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         addedTimeAttributes += 1
       } else {
         val field = fields(i - addedTimeAttributes)
-        logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2)).nullable(true)
+        logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2, isNullable = true))
       }
     }
 
     logicalRowTypeBuilder.build
   }
 
+  // ----------------------------------------------------------------------------------------------
+
   override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
     // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
     // always set those to default value
@@ -170,52 +227,48 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     }
   }
 
-  override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
-    new ArrayRelDataType(
+  override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
+    val relType = new ArrayRelDataType(
       ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
       elementType,
-      true)
+      isNullable = false)
 
-  private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
-    case ct: CompositeType[_] =>
-      new CompositeRelDataType(ct, this)
+    canonize(relType)
+  }
 
-    case pa: PrimitiveArrayTypeInfo[_] =>
-      new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
+  override def createTypeWithNullability(
+      relDataType: RelDataType,
+      isNullable: Boolean): RelDataType = {
 
-    case ba: BasicArrayTypeInfo[_, _] =>
-      new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true)
+    // nullability change not necessary
+    if (relDataType.isNullable == isNullable) {
+      return canonize(relDataType)
+    }
 
-    case oa: ObjectArrayTypeInfo[_, _] =>
-      new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
+    // change nullability
+    val newType = relDataType match {
 
-    case mp: MapTypeInfo[_, _] =>
-      new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo),
-        createTypeFromTypeInfo(mp.getValueTypeInfo), true)
+      case composite: CompositeRelDataType =>
+        new CompositeRelDataType(composite.compositeType, isNullable, this)
 
-    case ti: TypeInformation[_] =>
-      createTypeWithNullability(
-        new GenericRelDataType(ti, getTypeSystem.asInstanceOf[FlinkTypeSystem]),
-        nullable = true
-      )
+      case array: ArrayRelDataType =>
+        new ArrayRelDataType(array.typeInfo, array.getComponentType, isNullable)
 
-    case ti@_ =>
-      throw TableException(s"Unsupported type information: $ti")
-  }
+      case map: MapRelDataType =>
+        new MapRelDataType(map.typeInfo, map.keyType, map.valueType, isNullable)
+
+      case generic: GenericRelDataType =>
+        new GenericRelDataType(generic.typeInfo, isNullable, typeSystem)
+
+      case timeIndicator: TimeIndicatorRelDataType =>
+        timeIndicator
 
-  override def createTypeWithNullability(
-      relDataType: RelDataType,
-      nullable: Boolean)
-    : RelDataType = relDataType match {
-      case composite: CompositeRelDataType =>
-        // at the moment we do not care about nullability
-        canonize(composite)
-      case array: ArrayRelDataType =>
-        val elementType = createTypeWithNullability(array.getComponentType, nullable)
-        canonize(new ArrayRelDataType(array.typeInfo, elementType, nullable))
       case _ =>
-        super.createTypeWithNullability(relDataType, nullable)
+        super.createTypeWithNullability(relDataType, isNullable)
     }
+
+    canonize(newType)
+  }
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index d76613e..eb14291 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -42,9 +42,9 @@ import scala.collection.mutable
 class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
 
   private val timestamp = rexBuilder
-    .getTypeFactory
-    .asInstanceOf[FlinkTypeFactory]
-    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+      .getTypeFactory
+      .asInstanceOf[FlinkTypeFactory]
+      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
 
   override def visit(intersect: LogicalIntersect): RelNode =
     throw new TableException("Logical intersect in a stream environment is not supported yet.")
@@ -341,7 +341,7 @@ object RelTimeIndicatorConverter {
       .getRexBuilder
       .getTypeFactory
       .asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
 
     // convert all time indicators types to timestamps
     val fields = rootRel.getRowType.getFieldList.map { field =>
@@ -381,7 +381,7 @@ class RexTimeIndicatorMaterializer(
   private val timestamp = rexBuilder
     .getTypeFactory
     .asInstanceOf[FlinkTypeFactory]
-    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
 
   override def visitInputRef(inputRef: RexInputRef): RexNode = {
     // reference is interesting

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index cf36417..3e71c99 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -53,15 +53,21 @@ class ExpressionReducer(config: TableConfig)
       // we need to cast here for RexBuilder.makeLiteral
       case (SqlTypeName.DATE, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+            e)
         )
       case (SqlTypeName.TIME, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+            e)
         )
       case (SqlTypeName.TIMESTAMP, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, e.getType.isNullable),
+            e)
         )
 
       // we don't support object literals yet, we skip those constant expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index a9901a3..c2d1bdf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -67,7 +67,7 @@ case class Sum(child: Expression) extends Aggregation {
   override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
     val returnType = relBuilder
       .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(resultType)
+      .createTypeFromTypeInfo(resultType, isNullable = true)
     new SqlSumAggFunction(returnType)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 7211733..3288478 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -38,7 +38,7 @@ case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
     val relDataType = relBuilder
       .asInstanceOf[FlinkRelBuilder]
       .getTypeFactory
-      .createTypeFromTypeInfo(resultType)
+      .createTypeFromTypeInfo(resultType, isNullable = false)
     val values = elements.map(_.toRexNode).toList.asJava
     relBuilder
       .getRexBuilder

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index bd4fa2f..eb4b402 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -104,7 +104,7 @@ case class OverCall(
     val operator: SqlAggFunction = agg.asInstanceOf[Aggregation].getSqlAggFunction()
     val aggResultType = relBuilder
       .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(agg.resultType)
+      .createTypeFromTypeInfo(agg.resultType, isNullable = true)
 
     // assemble exprs by agg children
     val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
@@ -151,7 +151,7 @@ case class OverCall(
       case b: Literal =>
         val returnType = relBuilder
           .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-          .createTypeFromTypeInfo(Types.DECIMAL)
+          .createTypeFromTypeInfo(Types.DECIMAL, isNullable = true)
 
         val sqlOperator = new SqlPostfixOperator(
           sqlKind.name,

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
index 312bf12..ba08ccb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -30,12 +30,13 @@ case class Cast(child: Expression, resultType: TypeInformation[_]) extends Unary
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    val childRexNode = child.toRexNode
     relBuilder
       .getRexBuilder
       // we use abstract cast here because RelBuilder.cast() has to many side effects
       .makeAbstractCast(
-        typeFactory.createTypeFromTypeInfo(resultType),
-        child.toRexNode)
+        typeFactory.createTypeFromTypeInfo(resultType, childRexNode.getType.isNullable),
+        childRexNode)
   }
 
   override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 053e7ed..eb9c4f5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -121,7 +121,7 @@ case class Null(resultType: TypeInformation[_]) extends LeafExpression {
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     rexBuilder
       .makeCast(
-        typeFactory.createTypeFromTypeInfo(resultType),
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true),
         rexBuilder.constantNull())
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index 250ec0a..0a02666 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -103,7 +103,9 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
 
     // TODO convert this into Table API expressions to make the code more readable
     val rexBuilder = relBuilder.getRexBuilder
-    val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
+    val resultType = relBuilder
+      .getTypeFactory()
+      .createTypeFromTypeInfo(LONG_TYPE_INFO, isNullable = true)
     var result = rexBuilder.makeReinterpretCast(
       resultType,
       temporal,

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 4197760..b7d9991 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -97,7 +97,7 @@ object AggSqlFunction {
               s"Operand types of ${signatureToString(operandTypeInfo)} could not be inferred."))
 
         val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1))
-          .map(typeFactory.createTypeFromTypeInfo)
+          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
 
         for (i <- operandTypes.indices) {
           if (i < inferredTypes.length - 1) {
@@ -120,7 +120,7 @@ object AggSqlFunction {
 
     new SqlReturnTypeInference {
       override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
-        typeFactory.createTypeFromTypeInfo(resultType)
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index b1b45cd..0776f7a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -85,8 +85,7 @@ object ScalarSqlFunction {
               s"Expected: ${signaturesToString(scalarFunction, "eval")}")
         }
         val resultType = getResultTypeOfScalarFunction(scalarFunction, foundSignature.get)
-        val t = typeFactory.createTypeFromTypeInfo(resultType)
-        typeFactory.createTypeWithNullability(t, nullable = true)
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
       }
     }
   }
@@ -111,7 +110,7 @@ object ScalarSqlFunction {
 
         val inferredTypes = scalarFunction
           .getParameterTypes(foundSignature)
-          .map(typeFactory.createTypeFromTypeInfo)
+          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
 
         for (i <- operandTypes.indices) {
           if (i < inferredTypes.length - 1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
index 4443d6c..81f6bf0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
@@ -76,7 +76,7 @@ class LogicalWindowAggregate(
     namedProperties.foreach { namedProp =>
       builder.add(
         namedProp.name,
-        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
       )
     }
     builder.build()

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 8154738..3e605e8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -81,7 +81,7 @@ class FlinkLogicalWindowAggregate(
     namedProperties.foreach { namedProp =>
       builder.add(
         namedProp.name,
-        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
       )
     }
     builder.build()

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
index f7d9e1d..ed64c62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
@@ -42,7 +42,8 @@ class ArrayRelDataType(
     case that: ArrayRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        typeInfo == that.typeInfo
+        typeInfo == that.typeInfo &&
+        isNullable == that.isNullable
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index 3694cc5..e0c6b6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -31,10 +31,12 @@ import scala.collection.JavaConverters._
   * Composite type for encapsulating Flink's [[CompositeType]].
   *
   * @param compositeType CompositeType to encapsulate
+  * @param nullable flag if type can be nullable
   * @param typeFactory Flink's type factory
   */
 class CompositeRelDataType(
     val compositeType: CompositeType[_],
+    val nullable: Boolean,
     typeFactory: FlinkTypeFactory)
   extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) {
 
@@ -46,7 +48,8 @@ class CompositeRelDataType(
     case that: CompositeRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        compositeType == that.compositeType
+        compositeType == that.compositeType &&
+        nullable == that.nullable
     case _ => false
   }
 
@@ -54,6 +57,8 @@ class CompositeRelDataType(
     compositeType.hashCode()
   }
 
+  override def isNullable: Boolean = nullable
+
 }
 
 object CompositeRelDataType {
@@ -73,11 +78,11 @@ object CompositeRelDataType {
         new RelDataTypeFieldImpl(
           name,
           index,
-          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), nullable = true))
-            .asInstanceOf[RelDataTypeField]
+          // TODO the composite type should provide the information if subtypes are nullable
+          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), isNullable = true)
+        ).asInstanceOf[RelDataTypeField]
       }
       .toList
       .asJava
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index 1c05883..27fc2ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -78,7 +78,7 @@ class FlinkTableFunctionImpl[T](
     fieldNames
       .zip(fieldTypes)
       .foreach { f =>
-        builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
+        builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, isNullable = true))
       }
     builder.build
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
index d93908b..84dd669 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
@@ -18,23 +18,27 @@
 
 package org.apache.flink.table.plan.schema
 
+import org.apache.calcite.rel.`type`.RelDataTypeSystem
 import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.calcite.FlinkTypeSystem
 
 /**
   * Generic type for encapsulating Flink's [[TypeInformation]].
   *
   * @param typeInfo TypeInformation to encapsulate
+  * @param nullable flag if type can be nullable
   * @param typeSystem Flink's type system
   */
 class GenericRelDataType(
     val typeInfo: TypeInformation[_],
-    typeSystem: FlinkTypeSystem)
+    val nullable: Boolean,
+    typeSystem: RelDataTypeSystem)
   extends BasicSqlType(
     typeSystem,
     SqlTypeName.ANY) {
 
+  isNullable = nullable
+
   override def toString = s"ANY($typeInfo)"
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
@@ -43,7 +47,8 @@ class GenericRelDataType(
     case that: GenericRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        typeInfo == that.typeInfo
+        typeInfo == that.typeInfo &&
+        nullable == that.nullable
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
index 5e27061..ace881a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -38,12 +38,14 @@ class TimeIndicatorRelDataType(
     case that: TimeIndicatorRelDataType =>
       super.equals(that) &&
         isEventTime == that.isEventTime
-    case that: BasicSqlType =>
-      super.equals(that)
     case _ => false
   }
 
   override def hashCode(): Int = {
     super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps
   }
+
+  override def toString: String = {
+    s"TIME ATTRIBUTE(${if (isEventTime) "ROWTIME" else "PROCTIME"})"
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
index a024460..599c76b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTest.scala
@@ -772,8 +772,8 @@ class GroupWindowTest extends TableTestBase {
       ),
       term("select",
         "string",
-        "+(CAST(TMP_0), 1) AS s1",
-        "+(CAST(TMP_0), 3) AS s2",
+        "+(TMP_0, 1) AS s1",
+        "+(TMP_0, 3) AS s2",
         "TMP_1 AS x",
         "TMP_1 AS x2",
         "TMP_2 AS x3",

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 90c8ea4..870025e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -161,8 +161,8 @@ class TimeIndicatorConversionTest extends TableTestBase {
         term("invocation",
           s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3), '')"),
         term("function", func),
-        term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
-          "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
+        term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, " +
+          "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(2147483647) s)"),
         term("joinType", "INNER")
       ),
       term("select",

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa11565/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
index 457142c..d563f96 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/AggregateITCase.scala
@@ -42,6 +42,23 @@ class AggregationsITCase(
   extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
+  def testAggregationWithCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val inputTable = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b)
+    tEnv.registerDataSet("MyTable", inputTable)
+
+    val result = tEnv.scan("MyTable")
+      .where('a.get("_1") > 0)
+      .select('a.get("_1").avg, 'a.get("_2").sum, 'b.count)
+
+    val expected = "2,6,3"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment


[2/3] flink git commit: [FLINK-7137] [table] Fix nullability for nested types

Posted by tw...@apache.org.
[FLINK-7137] [table] Fix nullability for nested types


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d1e08a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d1e08a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d1e08a0

Branch: refs/heads/master
Commit: 2d1e08a02d84f8d7cb2734e09741eae72bf63b7d
Parents: bb11810
Author: Rong Rong <ro...@uber.com>
Authored: Wed Jul 12 21:29:16 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 13:59:15 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/table/calcite/FlinkTypeFactory.scala   | 12 +++++++++---
 .../flink/table/plan/schema/CompositeRelDataType.scala  |  2 +-
 .../flink/table/expressions/ScalarOperatorsTest.scala   | 11 +++++++++++
 .../expressions/utils/ScalarOperatorsTestBase.scala     |  6 ++++--
 4 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index eba1623..b63a3ad 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -51,7 +51,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
 
   private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
 
-  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
+  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType =
+    createTypeFromTypeInfo(typeInfo, nullable = false)
+
+  def createTypeFromTypeInfo(typeInfo: TypeInformation[_], nullable: Boolean): RelDataType = {
     // simple type can be converted to SQL types and vice versa
     if (isSimple(typeInfo)) {
       val sqlType = typeInfoToSqlTypeName(typeInfo)
@@ -73,7 +76,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
           }
 
         case _ =>
-          createSqlType(sqlType)
+          createTypeWithNullability(createSqlType(sqlType), nullable)
       }
     }
     // advanced types require specific RelDataType
@@ -191,7 +194,10 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         createTypeFromTypeInfo(mp.getValueTypeInfo), true)
 
     case ti: TypeInformation[_] =>
-      new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
+      createTypeWithNullability(
+        new GenericRelDataType(ti, getTypeSystem.asInstanceOf[FlinkTypeSystem]),
+        nullable = true
+      )
 
     case ti@_ =>
       throw TableException(s"Unsupported type information: $ti")

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index a60514b..3694cc5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -73,7 +73,7 @@ object CompositeRelDataType {
         new RelDataTypeFieldImpl(
           name,
           index,
-          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
+          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), nullable = true))
             .asInstanceOf[RelDataTypeField]
       }
       .toList

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 738413e..6cb9fa8 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -138,6 +138,17 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
 
   @Test
   def testOtherExpressions(): Unit = {
+
+    // nested field null type
+    testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
+    testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
+    testAllApis('f13.isNull, "f13.isNull", "f13 IS NULL", "false")
+    testAllApis('f13.isNotNull, "f13.isNotNull", "f13 IS NOT NULL", "true")
+    testAllApis('f13.get("f0").isNull, "f13.get('f0').isNull", "f13.f0 IS NULL", "false")
+    testAllApis('f13.get("f0").isNotNull, "f13.get('f0').isNotNull", "f13.f0 IS NOT NULL", "true")
+    testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
+    testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
+
     // boolean literals
     testAllApis(
       true,

http://git-wip-us.apache.org/repos/asf/flink/blob/2d1e08a0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 2d22843..b719390 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.types.Row
 class ScalarOperatorsTestBase extends ExpressionTestBase {
 
   def testData: Row = {
-    val testData = new Row(13)
+    val testData = new Row(14)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
     testData.setField(2, 1)
@@ -41,6 +41,7 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
     testData.setField(10, "String")
     testData.setField(11, false)
     testData.setField(12, null)
+    testData.setField(13, Row.of("foo", null))
     testData
   }
 
@@ -58,7 +59,8 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
       Types.INT,
       Types.STRING,
       Types.BOOLEAN,
-      Types.BOOLEAN
+      Types.BOOLEAN,
+      Types.ROW(Types.STRING, Types.STRING)
       ).asInstanceOf[TypeInformation[Any]]
   }
 


[3/3] flink git commit: [FLINK-7137] [table] Add unit tests to verify composite type nullable and nested field nullable on aggregations

Posted by tw...@apache.org.
[FLINK-7137] [table] Add unit tests to verify composite type nullable and nested field nullable on aggregations

This closes #4314.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0bad3b8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0bad3b8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0bad3b8

Branch: refs/heads/master
Commit: c0bad3b80d6fe67e43bc1a5d3bebbd98479e3d76
Parents: 7aa1156
Author: Rong Rong <ro...@uber.com>
Authored: Wed Jul 19 13:26:31 2017 -0700
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 14:06:00 2017 +0200

----------------------------------------------------------------------
 .../table/api/batch/sql/AggregateTest.scala     | 40 +++++++++++++++++++
 .../table/api/batch/table/AggregateTest.scala   | 42 ++++++++++++++++++++
 2 files changed, 82 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0bad3b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
index 7ae7db6..445fed3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/AggregateTest.scala
@@ -98,6 +98,46 @@ class AggregateTest extends TableTestBase {
   }
 
   @Test
+  def testAggregateWithFilterOnNestedFields(): Unit = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, (Int, Long))]("MyTable", 'a, 'b, 'c)
+
+    val sqlQuery = "SELECT avg(a), sum(b), count(c), sum(c._1) FROM MyTable WHERE a = 1"
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "CAST(1) AS a", "b", "c", "c._1 AS $f3"),
+      term("where", "=(a, 1)")
+    )
+
+    val setValues =  unaryNode(
+      "DataSetValues",
+      calcNode,
+      tuples(List(null,null,null,null)),
+      term("values","a","b","c","$f3")
+    )
+
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c","$f3")
+    )
+
+    val aggregate = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS EXPR$0",
+        "SUM(b) AS EXPR$1",
+        "COUNT(c) AS EXPR$2",
+        "SUM($f3) AS EXPR$3")
+    )
+
+    util.verifySql(sqlQuery, aggregate)
+  }
+
+  @Test
   def testGroupAggregate(): Unit = {
     val util = batchTestUtil()
     util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)

http://git-wip-us.apache.org/repos/asf/flink/blob/c0bad3b8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
index af001be..0a135d1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/AggregateTest.scala
@@ -130,4 +130,46 @@ class AggregateTest extends TableTestBase {
 
     util.verifyTable(resultTable, expected)
   }
+
+  @Test
+  def testAggregateWithFilterOnNestedFields(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, (Int, Long))]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+      .select('a.avg,'b.sum,'c.count, 'c.get("_1").sum)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      // ReduceExpressionsRule will add cast for Project node by force
+      // if the input of the Project node has constant expression.
+      term("select", "CAST(1) AS a", "b", "c", "c._1 AS $f3"),
+      term("where", "=(a, 1)")
+    )
+
+    val setValues =  unaryNode(
+      "DataSetValues",
+      calcNode,
+      tuples(List(null,null,null,null)),
+      term("values","a","b","c","$f3")
+    )
+
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c","$f3")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2",
+        "SUM($f3) AS TMP_3")
+    )
+    util.verifyTable(resultTable, expected)
+  }
 }