You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/24 12:31:09 UTC

[1/2] flink git commit: [FLINK-7137] [table] Rework nullability handling

Repository: flink
Updated Branches:
  refs/heads/release-1.3 45a5f875b -> 8c87c4469


[FLINK-7137] [table] Rework nullability handling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be8ca8a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be8ca8a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be8ca8a3

Branch: refs/heads/release-1.3
Commit: be8ca8a384604a2fb2bd74886f452e4a61ce9cfb
Parents: 45a5f87
Author: twalthr <tw...@apache.org>
Authored: Wed Jul 19 16:23:09 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 14:14:41 2017 +0200

----------------------------------------------------------------------
 .../flink/table/calcite/FlinkTypeFactory.scala  | 189 ++++++++++++-------
 .../calcite/RelTimeIndicatorConverter.scala     |   4 +-
 .../flink/table/codegen/ExpressionReducer.scala |  12 +-
 .../flink/table/expressions/aggregations.scala  |   2 +-
 .../apache/flink/table/expressions/array.scala  |   2 +-
 .../apache/flink/table/expressions/call.scala   |   4 +-
 .../apache/flink/table/expressions/cast.scala   |   5 +-
 .../flink/table/expressions/literals.scala      |   2 +-
 .../apache/flink/table/expressions/time.scala   |   4 +-
 .../table/functions/utils/AggSqlFunction.scala  |   4 +-
 .../functions/utils/ScalarSqlFunction.scala     |   5 +-
 .../logical/rel/LogicalWindowAggregate.scala    |   2 +-
 .../logical/FlinkLogicalWindowAggregate.scala   |   2 +-
 .../table/plan/schema/ArrayRelDataType.scala    |   3 +-
 .../plan/schema/CompositeRelDataType.scala      |  13 +-
 .../plan/schema/FlinkTableFunctionImpl.scala    |   2 +-
 .../table/plan/schema/GenericRelDataType.scala  |  11 +-
 .../plan/schema/TimeIndicatorRelDataType.scala  |   6 +-
 .../scala/batch/table/AggregationsITCase.scala  |  17 ++
 .../scala/stream/table/GroupWindowTest.scala    |   4 +-
 .../calcite/RelTimeIndicatorConverterTest.scala |   4 +-
 21 files changed, 197 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index eba1623..6c23b9e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -49,45 +49,53 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   // NOTE: for future data types it might be necessary to
   // override more methods of RelDataTypeFactoryImpl
 
-  private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]()
-
-  def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = {
-    // simple type can be converted to SQL types and vice versa
-    if (isSimple(typeInfo)) {
-      val sqlType = typeInfoToSqlTypeName(typeInfo)
-      sqlType match {
-
-        case INTERVAL_YEAR_MONTH =>
-          createSqlIntervalType(
-            new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
-
-        case INTERVAL_DAY_SECOND =>
-          createSqlIntervalType(
-            new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
-
-        case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
-          if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
-            createRowtimeIndicatorType()
-          } else {
-            createProctimeIndicatorType()
-          }
-
-        case _ =>
-          createSqlType(sqlType)
+  private val seenTypes = mutable.HashMap[(TypeInformation[_], Boolean), RelDataType]()
+
+  def createTypeFromTypeInfo(
+      typeInfo: TypeInformation[_],
+      isNullable: Boolean)
+    : RelDataType = {
+
+      // we cannot use seenTypes for simple types,
+      // because time indicators and timestamps would be the same
+
+      val relType = if (isSimple(typeInfo)) {
+        // simple types can be converted to SQL types and vice versa
+        val sqlType = typeInfoToSqlTypeName(typeInfo)
+        sqlType match {
+
+          case INTERVAL_YEAR_MONTH =>
+            createSqlIntervalType(
+              new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
+
+          case INTERVAL_DAY_SECOND =>
+            createSqlIntervalType(
+              new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
+
+          case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+            if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+              createRowtimeIndicatorType()
+            } else {
+              createProctimeIndicatorType()
+            }
+
+          case _ =>
+            createSqlType(sqlType)
+        }
+      } else {
+        // advanced types require specific RelDataType
+        // for storing the original TypeInformation
+        seenTypes.getOrElseUpdate((typeInfo, isNullable), createAdvancedType(typeInfo, isNullable))
       }
-    }
-    // advanced types require specific RelDataType
-    // for storing the original TypeInformation
-    else {
-      seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo)))
-    }
+
+    createTypeWithNullability(relType, isNullable)
   }
 
   /**
     * Creates a indicator type for processing-time, but with similar properties as SQL timestamp.
     */
   def createProctimeIndicatorType(): RelDataType = {
-    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
     canonize(
       new TimeIndicatorRelDataType(
         getTypeSystem,
@@ -100,7 +108,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     * Creates a indicator type for event-time, but with similar properties as SQL timestamp.
     */
   def createRowtimeIndicatorType(): RelDataType = {
-    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+    val originalType = createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
     canonize(
       new TimeIndicatorRelDataType(
         getTypeSystem,
@@ -110,6 +118,56 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
   }
 
   /**
+    * Creates types that create custom [[RelDataType]]s that wrap Flink's [[TypeInformation]].
+    */
+  private def createAdvancedType(
+      typeInfo: TypeInformation[_],
+      isNullable: Boolean): RelDataType = {
+
+    val relType = typeInfo match {
+
+      case ct: CompositeType[_] =>
+        new CompositeRelDataType(ct, isNullable, this)
+
+      case pa: PrimitiveArrayTypeInfo[_] =>
+        new ArrayRelDataType(
+          pa,
+          createTypeFromTypeInfo(pa.getComponentType, isNullable = false),
+          isNullable)
+
+      case ba: BasicArrayTypeInfo[_, _] =>
+        new ArrayRelDataType(
+          ba,
+          createTypeFromTypeInfo(ba.getComponentInfo, isNullable = true),
+          isNullable)
+
+      case oa: ObjectArrayTypeInfo[_, _] =>
+        new ArrayRelDataType(
+          oa,
+          createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true),
+          isNullable)
+
+      case mp: MapTypeInfo[_, _] =>
+        new MapRelDataType(
+          mp,
+          createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
+          createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
+          isNullable)
+
+      case ti: TypeInformation[_] =>
+        new GenericRelDataType(
+          ti,
+          isNullable,
+          getTypeSystem.asInstanceOf[FlinkTypeSystem])
+
+      case ti@_ =>
+        throw TableException(s"Unsupported type information: $ti")
+    }
+
+    canonize(relType)
+  }
+
+  /**
     * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory
     *
     * @param fieldNames field names
@@ -150,13 +208,15 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
         addedTimeAttributes += 1
       } else {
         val field = fields(i - addedTimeAttributes)
-        logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2)).nullable(true)
+        logicalRowTypeBuilder.add(field._1, createTypeFromTypeInfo(field._2, isNullable = true))
       }
     }
 
     logicalRowTypeBuilder.build
   }
 
+  // ----------------------------------------------------------------------------------------------
+
   override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = {
     // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue
     // always set those to default value
@@ -167,49 +227,48 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
     }
   }
 
-  override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType =
-    new ArrayRelDataType(
+  override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType = {
+    val relType = new ArrayRelDataType(
       ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)),
       elementType,
-      true)
+      isNullable = false)
 
-  private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match {
-    case ct: CompositeType[_] =>
-      new CompositeRelDataType(ct, this)
+    canonize(relType)
+  }
 
-    case pa: PrimitiveArrayTypeInfo[_] =>
-      new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
+  override def createTypeWithNullability(
+      relDataType: RelDataType,
+      isNullable: Boolean): RelDataType = {
 
-    case ba: BasicArrayTypeInfo[_, _] =>
-      new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true)
+    // nullability change not necessary
+    if (relDataType.isNullable == isNullable) {
+      return canonize(relDataType)
+    }
 
-    case oa: ObjectArrayTypeInfo[_, _] =>
-      new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
+    // change nullability
+    val newType = relDataType match {
 
-    case mp: MapTypeInfo[_, _] =>
-      new MapRelDataType(mp, createTypeFromTypeInfo(mp.getKeyTypeInfo),
-        createTypeFromTypeInfo(mp.getValueTypeInfo), true)
+      case composite: CompositeRelDataType =>
+        new CompositeRelDataType(composite.compositeType, isNullable, this)
 
-    case ti: TypeInformation[_] =>
-      new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem])
+      case array: ArrayRelDataType =>
+        new ArrayRelDataType(array.typeInfo, array.getComponentType, isNullable)
 
-    case ti@_ =>
-      throw TableException(s"Unsupported type information: $ti")
-  }
+      case map: MapRelDataType =>
+        new MapRelDataType(map.typeInfo, map.keyType, map.valueType, isNullable)
+
+      case generic: GenericRelDataType =>
+        new GenericRelDataType(generic.typeInfo, isNullable, typeSystem)
+
+      case timeIndicator: TimeIndicatorRelDataType =>
+        timeIndicator
 
-  override def createTypeWithNullability(
-      relDataType: RelDataType,
-      nullable: Boolean)
-    : RelDataType = relDataType match {
-      case composite: CompositeRelDataType =>
-        // at the moment we do not care about nullability
-        canonize(composite)
-      case array: ArrayRelDataType =>
-        val elementType = createTypeWithNullability(array.getComponentType, nullable)
-        canonize(new ArrayRelDataType(array.typeInfo, elementType, nullable))
       case _ =>
-        super.createTypeWithNullability(relDataType, nullable)
+        super.createTypeWithNullability(relDataType, isNullable)
     }
+
+    canonize(newType)
+  }
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index b28e3f8..ac4dc13 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -44,7 +44,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
   private val timestamp = rexBuilder
       .getTypeFactory
       .asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
 
   override def visit(intersect: LogicalIntersect): RelNode =
     throw new TableException("Logical intersect in a stream environment is not supported yet.")
@@ -399,7 +399,7 @@ object RelTimeIndicatorConverter {
       .getRexBuilder
       .getTypeFactory
       .asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP)
+      .createTypeFromTypeInfo(SqlTimeTypeInfo.TIMESTAMP, isNullable = false)
 
     // convert all time indicators types to timestamps
     val fields = rootRel.getRowType.getFieldList.map { field =>

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index b7e1335..3241b5e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -53,15 +53,21 @@ class ExpressionReducer(config: TableConfig)
       // we need to cast here for RexBuilder.makeLiteral
       case (SqlTypeName.DATE, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+            e)
         )
       case (SqlTypeName.TIME, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO), e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.INT_TYPE_INFO, e.getType.isNullable),
+            e)
         )
       case (SqlTypeName.TIMESTAMP, e) =>
         Some(
-          rexBuilder.makeCast(typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO), e)
+          rexBuilder.makeCast(
+            typeFactory.createTypeFromTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, e.getType.isNullable),
+            e)
         )
 
       // we don't support object literals yet, we skip those constant expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
index 6d906b9..d38880c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -67,7 +67,7 @@ case class Sum(child: Expression) extends Aggregation {
   override private[flink] def getSqlAggFunction()(implicit relBuilder: RelBuilder) = {
     val returnType = relBuilder
       .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(resultType)
+      .createTypeFromTypeInfo(resultType, isNullable = true)
     new SqlSumAggFunction(returnType)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 7211733..3288478 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -38,7 +38,7 @@ case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
     val relDataType = relBuilder
       .asInstanceOf[FlinkRelBuilder]
       .getTypeFactory
-      .createTypeFromTypeInfo(resultType)
+      .createTypeFromTypeInfo(resultType, isNullable = false)
     val values = elements.map(_.toRexNode).toList.asJava
     relBuilder
       .getRexBuilder

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index 13f8a11..5d1a60e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -104,7 +104,7 @@ case class OverCall(
     val operator: SqlAggFunction = agg.asInstanceOf[Aggregation].getSqlAggFunction()
     val aggResultType = relBuilder
       .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-      .createTypeFromTypeInfo(agg.resultType)
+      .createTypeFromTypeInfo(agg.resultType, isNullable = true)
 
     // assemble exprs by agg children
     val aggExprs = agg.asInstanceOf[Aggregation].children.map(_.toRexNode(relBuilder)).asJava
@@ -151,7 +151,7 @@ case class OverCall(
       case b: Literal =>
         val returnType = relBuilder
           .getTypeFactory.asInstanceOf[FlinkTypeFactory]
-          .createTypeFromTypeInfo(Types.DECIMAL)
+          .createTypeFromTypeInfo(Types.DECIMAL, isNullable = true)
 
         val sqlOperator = new SqlPostfixOperator(
           sqlKind.name,

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
index 312bf12..ba08ccb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -30,12 +30,13 @@ case class Cast(child: Expression, resultType: TypeInformation[_]) extends Unary
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    val childRexNode = child.toRexNode
     relBuilder
       .getRexBuilder
       // we use abstract cast here because RelBuilder.cast() has to many side effects
       .makeAbstractCast(
-        typeFactory.createTypeFromTypeInfo(resultType),
-        child.toRexNode)
+        typeFactory.createTypeFromTypeInfo(resultType, childRexNode.getType.isNullable),
+        childRexNode)
   }
 
   override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index 053e7ed..eb9c4f5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -121,7 +121,7 @@ case class Null(resultType: TypeInformation[_]) extends LeafExpression {
     val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     rexBuilder
       .makeCast(
-        typeFactory.createTypeFromTypeInfo(resultType),
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true),
         rexBuilder.constantNull())
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index f09e2ad..98e3030 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -101,7 +101,9 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
 
     // TODO convert this into Table API expressions to make the code more readable
     val rexBuilder = relBuilder.getRexBuilder
-    val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
+    val resultType = relBuilder
+      .getTypeFactory()
+      .createTypeFromTypeInfo(LONG_TYPE_INFO, isNullable = true)
     var result = rexBuilder.makeReinterpretCast(
       resultType,
       temporal,

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index 816bc52..4b22de7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -92,7 +92,7 @@ object AggSqlFunction {
               s"Operand types of ${signatureToString(operandTypeInfo)} could not be inferred."))
 
         val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1))
-          .map(typeFactory.createTypeFromTypeInfo)
+          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
 
         for (i <- operandTypes.indices) {
           if (i < inferredTypes.length - 1) {
@@ -115,7 +115,7 @@ object AggSqlFunction {
 
     new SqlReturnTypeInference {
       override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
-        typeFactory.createTypeFromTypeInfo(resultType)
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index bbfa3aa..8f71d72 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -84,8 +84,7 @@ object ScalarSqlFunction {
               s"Expected: ${signaturesToString(scalarFunction, "eval")}")
         }
         val resultType = getResultTypeOfScalarFunction(scalarFunction, foundSignature.get)
-        val t = typeFactory.createTypeFromTypeInfo(resultType)
-        typeFactory.createTypeWithNullability(t, nullable = true)
+        typeFactory.createTypeFromTypeInfo(resultType, isNullable = true)
       }
     }
   }
@@ -110,7 +109,7 @@ object ScalarSqlFunction {
 
         val inferredTypes = scalarFunction
           .getParameterTypes(foundSignature)
-          .map(typeFactory.createTypeFromTypeInfo)
+          .map(typeFactory.createTypeFromTypeInfo(_, isNullable = true))
 
         for (i <- operandTypes.indices) {
           if (i < inferredTypes.length - 1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
index 4443d6c..81f6bf0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
@@ -76,7 +76,7 @@ class LogicalWindowAggregate(
     namedProperties.foreach { namedProp =>
       builder.add(
         namedProp.name,
-        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
       )
     }
     builder.build()

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 8154738..3e605e8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -81,7 +81,7 @@ class FlinkLogicalWindowAggregate(
     namedProperties.foreach { namedProp =>
       builder.add(
         namedProp.name,
-        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+        typeFactory.createTypeFromTypeInfo(namedProp.property.resultType, isNullable = false)
       )
     }
     builder.build()

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
index f7d9e1d..ed64c62 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/ArrayRelDataType.scala
@@ -42,7 +42,8 @@ class ArrayRelDataType(
     case that: ArrayRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        typeInfo == that.typeInfo
+        typeInfo == that.typeInfo &&
+        isNullable == that.isNullable
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
index a60514b..e0c6b6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/CompositeRelDataType.scala
@@ -31,10 +31,12 @@ import scala.collection.JavaConverters._
   * Composite type for encapsulating Flink's [[CompositeType]].
   *
   * @param compositeType CompositeType to encapsulate
+  * @param nullable flag if type can be nullable
   * @param typeFactory Flink's type factory
   */
 class CompositeRelDataType(
     val compositeType: CompositeType[_],
+    val nullable: Boolean,
     typeFactory: FlinkTypeFactory)
   extends RelRecordType(StructKind.PEEK_FIELDS, createFieldList(compositeType, typeFactory)) {
 
@@ -46,7 +48,8 @@ class CompositeRelDataType(
     case that: CompositeRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        compositeType == that.compositeType
+        compositeType == that.compositeType &&
+        nullable == that.nullable
     case _ => false
   }
 
@@ -54,6 +57,8 @@ class CompositeRelDataType(
     compositeType.hashCode()
   }
 
+  override def isNullable: Boolean = nullable
+
 }
 
 object CompositeRelDataType {
@@ -73,11 +78,11 @@ object CompositeRelDataType {
         new RelDataTypeFieldImpl(
           name,
           index,
-          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index)))
-            .asInstanceOf[RelDataTypeField]
+          // TODO the composite type should provide the information if subtypes are nullable
+          typeFactory.createTypeFromTypeInfo(compositeType.getTypeAt(index), isNullable = true)
+        ).asInstanceOf[RelDataTypeField]
       }
       .toList
       .asJava
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
index 1c05883..27fc2ea 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTableFunctionImpl.scala
@@ -78,7 +78,7 @@ class FlinkTableFunctionImpl[T](
     fieldNames
       .zip(fieldTypes)
       .foreach { f =>
-        builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2)).nullable(true)
+        builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, isNullable = true))
       }
     builder.build
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
index d93908b..84dd669 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/GenericRelDataType.scala
@@ -18,23 +18,27 @@
 
 package org.apache.flink.table.plan.schema
 
+import org.apache.calcite.rel.`type`.RelDataTypeSystem
 import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.calcite.FlinkTypeSystem
 
 /**
   * Generic type for encapsulating Flink's [[TypeInformation]].
   *
   * @param typeInfo TypeInformation to encapsulate
+  * @param nullable flag if type can be nullable
   * @param typeSystem Flink's type system
   */
 class GenericRelDataType(
     val typeInfo: TypeInformation[_],
-    typeSystem: FlinkTypeSystem)
+    val nullable: Boolean,
+    typeSystem: RelDataTypeSystem)
   extends BasicSqlType(
     typeSystem,
     SqlTypeName.ANY) {
 
+  isNullable = nullable
+
   override def toString = s"ANY($typeInfo)"
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[GenericRelDataType]
@@ -43,7 +47,8 @@ class GenericRelDataType(
     case that: GenericRelDataType =>
       super.equals(that) &&
         (that canEqual this) &&
-        typeInfo == that.typeInfo
+        typeInfo == that.typeInfo &&
+        nullable == that.nullable
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
index 5e27061..ace881a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TimeIndicatorRelDataType.scala
@@ -38,12 +38,14 @@ class TimeIndicatorRelDataType(
     case that: TimeIndicatorRelDataType =>
       super.equals(that) &&
         isEventTime == that.isEventTime
-    case that: BasicSqlType =>
-      super.equals(that)
     case _ => false
   }
 
   override def hashCode(): Int = {
     super.hashCode() + 42 // we change the hash code to differentiate from regular timestamps
   }
+
+  override def toString: String = {
+    s"TIME ATTRIBUTE(${if (isEventTime) "ROWTIME" else "PROCTIME"})"
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
index 45abf09..dd0f487 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
@@ -42,6 +42,23 @@ class AggregationsITCase(
   extends TableProgramsCollectionTestBase(configMode) {
 
   @Test
+  def testAggregationWithCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val inputTable = CollectionDataSets.getSmallNestedTupleDataSet(env).toTable(tEnv, 'a, 'b)
+    tEnv.registerDataSet("MyTable", inputTable)
+
+    val result = tEnv.scan("MyTable")
+      .where('a.get("_1") > 0)
+      .select('a.get("_1").avg, 'a.get("_2").sum, 'b.count)
+
+    val expected = "2,6,3"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index 593b036..bb2a498 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -950,8 +950,8 @@ class GroupWindowTest extends TableTestBase {
       ),
       term("select",
         "string",
-        "+(CAST(TMP_0), 1) AS s1",
-        "+(CAST(TMP_0), 3) AS s2",
+        "+(TMP_0, 1) AS s1",
+        "+(TMP_0, 3) AS s2",
         "TMP_1 AS x",
         "TMP_1 AS x2",
         "TMP_2 AS x3",

http://git-wip-us.apache.org/repos/asf/flink/blob/be8ca8a3/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
index c315d94..8c004df 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -211,8 +211,8 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
         term("invocation",
           s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), TIME_MATERIALIZATION($$3), '')"),
         term("function", func),
-        term("rowType", "RecordType(TIMESTAMP(3) rowtime, BIGINT long, INTEGER int, " +
-          "TIMESTAMP(3) proctime, VARCHAR(2147483647) s)"),
+        term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT long, INTEGER int, " +
+          "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(2147483647) s)"),
         term("joinType", "INNER")
       ),
       term("select",


[2/2] flink git commit: [FLINK-7137] [table] Backport of tests for Flink 1.3

Posted by tw...@apache.org.
[FLINK-7137] [table] Backport of tests for Flink 1.3


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c87c446
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c87c446
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c87c446

Branch: refs/heads/release-1.3
Commit: 8c87c44692bc27fb8018adf587715a9488947799
Parents: be8ca8a
Author: twalthr <tw...@apache.org>
Authored: Mon Jul 24 14:27:07 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon Jul 24 14:27:07 2017 +0200

----------------------------------------------------------------------
 .../table/expressions/ScalarOperatorsTest.scala    | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c87c446/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 0639835..5a82a8f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -138,6 +138,17 @@ class ScalarOperatorsTest extends ExpressionTestBase {
 
   @Test
   def testOtherExpressions(): Unit = {
+
+    // nested field null type
+    testSqlApi("CASE WHEN f13.f1 IS NULL THEN 'a' ELSE 'b' END", "a")
+    testSqlApi("CASE WHEN f13.f1 IS NOT NULL THEN 'a' ELSE 'b' END", "b")
+    testAllApis('f13.isNull, "f13.isNull", "f13 IS NULL", "false")
+    testAllApis('f13.isNotNull, "f13.isNotNull", "f13 IS NOT NULL", "true")
+    testAllApis('f13.get("f0").isNull, "f13.get('f0').isNull", "f13.f0 IS NULL", "false")
+    testAllApis('f13.get("f0").isNotNull, "f13.get('f0').isNotNull", "f13.f0 IS NOT NULL", "true")
+    testAllApis('f13.get("f1").isNull, "f13.get('f1').isNull", "f13.f1 IS NULL", "true")
+    testAllApis('f13.get("f1").isNotNull, "f13.get('f1').isNotNull", "f13.f1 IS NOT NULL", "false")
+
     // boolean literals
     testAllApis(
       true,
@@ -239,7 +250,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   def testData = {
-    val testData = new Row(13)
+    val testData = new Row(14)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
     testData.setField(2, 1)
@@ -253,6 +264,7 @@ class ScalarOperatorsTest extends ExpressionTestBase {
     testData.setField(10, "String")
     testData.setField(11, false)
     testData.setField(12, null)
+    testData.setField(13, Row.of("foo", null))
     testData
   }
 
@@ -270,7 +282,8 @@ class ScalarOperatorsTest extends ExpressionTestBase {
       Types.INT,
       Types.STRING,
       Types.BOOLEAN,
-      Types.BOOLEAN
+      Types.BOOLEAN,
+      Types.ROW(Types.STRING, Types.STRING)
       ).asInstanceOf[TypeInformation[Any]]
   }