You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/10/17 08:37:05 UTC

flink git commit: [FLINK-7426] [table] Support null values in keys

Repository: flink
Updated Branches:
  refs/heads/master e79cedf23 -> 14bc62740


[FLINK-7426] [table] Support null values in keys

This closes #4732.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14bc6274
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14bc6274
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14bc6274

Branch: refs/heads/master
Commit: 14bc62740e90ecefd34f9202f4a37c883c3122e5
Parents: e79cedf
Author: twalthr <tw...@apache.org>
Authored: Wed Sep 27 13:05:44 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 17 10:35:53 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/types/Row.java   | 22 +++++++++--
 .../java/org/apache/flink/types/RowTest.java    | 33 ++++++++++++++++
 .../flink/table/calcite/FlinkTypeFactory.scala  | 14 +++----
 .../datastream/DataStreamGroupAggregate.scala   |  3 +-
 .../DataStreamGroupWindowAggregate.scala        | 18 +++++----
 .../datastream/DataStreamOverAggregate.scala    |  5 ++-
 .../nodes/datastream/DataStreamWindowJoin.scala |  9 ++++-
 .../flink/table/plan/schema/RowSchema.scala     |  8 ++++
 .../flink/table/runtime/CRowKeySelector.scala   | 41 ++++++++++++++++++++
 .../table/runtime/aggregate/AggregateUtil.scala |  7 ++--
 ...IncrementalAggregateTimeWindowFunction.scala |  2 +-
 .../IncrementalAggregateWindowFunction.scala    |  7 ++--
 .../table/runtime/stream/sql/JoinITCase.scala   |  5 ++-
 .../runtime/stream/table/AggregateITCase.scala  |  8 ++--
 .../stream/table/GroupWindowITCase.scala        | 27 ++++++++-----
 .../runtime/stream/table/OverWindowITCase.scala |  6 ++-
 16 files changed, 166 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
index 0b2120f..4783314 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Row.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -142,15 +142,29 @@ public class Row implements Serializable{
 
 	/**
 	 * Creates a new Row which copied from another row.
+	 * This method does not perform a deep copy.
 	 *
 	 * @param row The row being copied.
 	 * @return The cloned new Row
 	 */
 	public static Row copy(Row row) {
-		Row ret = new Row(row.getArity());
-		for (int i = 0; i < row.getArity(); ++i) {
-			ret.setField(i, row.getField(i));
+		final Row newRow = new Row(row.fields.length);
+		System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
+		return newRow;
+	}
+
+	/**
+	 * Creates a new Row with projected fields from another row.
+	 * This method does not perform a deep copy.
+	 *
+	 * @param fields fields to be projected
+	 * @return the new projected Row
+	 */
+	public static Row project(Row row, int[] fields) {
+		final Row newRow = new Row(fields.length);
+		for (int i = 0; i < fields.length; i++) {
+			newRow.fields[i] = row.fields[fields[i]];
 		}
-		return ret;
+		return newRow;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-core/src/test/java/org/apache/flink/types/RowTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
index 13a4d6a..067992a 100644
--- a/flink-core/src/test/java/org/apache/flink/types/RowTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class RowTest {
 	@Test
@@ -46,4 +47,36 @@ public class RowTest {
 		row2.setField(4, true);
 		assertEquals(row1, row2);
 	}
+
+	@Test
+	public void testRowCopy() {
+		Row row = new Row(5);
+		row.setField(0, 1);
+		row.setField(1, "hello");
+		row.setField(2, null);
+		row.setField(3, new Tuple2<>(2, "hi"));
+		row.setField(4, "hello world");
+
+		Row copy = Row.copy(row);
+		assertEquals(row, copy);
+		assertTrue(row != copy);
+	}
+
+	@Test
+	public void testRowProject() {
+		Row row = new Row(5);
+		row.setField(0, 1);
+		row.setField(1, "hello");
+		row.setField(2, null);
+		row.setField(3, new Tuple2<>(2, "hi"));
+		row.setField(4, "hello world");
+
+		Row projected = Row.project(row, new int[]{0, 2, 4});
+
+		Row expected = new Row(3);
+		expected.setField(0, 1);
+		expected.setField(1, null);
+		expected.setField(2, "hello world");
+		assertEquals(expected, projected);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 768d700..2874e61 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -149,13 +149,6 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
           createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true),
           isNullable)
 
-      case mp: MapTypeInfo[_, _] =>
-        new MapRelDataType(
-          mp,
-          createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
-          createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
-          isNullable)
-
       case mts: MultisetTypeInfo[_] =>
         new MultisetRelDataType(
           mts,
@@ -163,6 +156,13 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
           isNullable
         )
 
+      case mp: MapTypeInfo[_, _] =>
+        new MapRelDataType(
+          mp,
+          createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
+          createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
+          isNullable)
+
       case ti: TypeInformation[_] =>
         new GenericRelDataType(
           ti,

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 742a7e4..71de57c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.plan.nodes.CommonAggregate
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -144,7 +145,7 @@ class DataStreamGroupAggregate(
     // grouped / keyed aggregation
       if (groupings.nonEmpty) {
         inputDS
-        .keyBy(groupings: _*)
+        .keyBy(new CRowKeySelector(groupings, inputSchema.projectedTypeInfo(groupings)))
         .process(processFunction)
         .returns(outRowType)
         .name(keyedAggOpName)

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index db15839..267bc3b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -22,7 +22,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
@@ -34,16 +33,17 @@ import org.apache.flink.table.expressions.ExpressionUtils._
 import org.apache.flink.table.expressions.ResolvedFieldReference
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
-import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
-import org.apache.flink.table.runtime.RowtimeProcessFunction
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.aggregate.AggregateUtil._
 import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{CRowKeySelector, RowtimeProcessFunction}
 import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
 import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
 
 class DataStreamGroupWindowAggregate(
     window: LogicalWindow,
@@ -189,10 +189,12 @@ class DataStreamGroupWindowAggregate(
         schema.arity,
         namedProperties)
 
-      val keyedStream = timestampedInput.keyBy(grouping: _*)
+      val keySelector = new CRowKeySelector(grouping, inputSchema.projectedTypeInfo(grouping))
+
+      val keyedStream = timestampedInput.keyBy(keySelector)
       val windowedStream =
         createKeyedWindowedStream(queryConfig, window, keyedStream)
-          .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
+          .asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]]
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
@@ -241,8 +243,8 @@ object DataStreamGroupWindowAggregate {
   private def createKeyedWindowedStream(
       queryConfig: StreamQueryConfig,
       groupWindow: LogicalWindow,
-      stream: KeyedStream[CRow, Tuple]):
-    WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match {
+      stream: KeyedStream[CRow, Row]):
+    WindowedStream[CRow, Row, _ <: DataStreamWindow] = groupWindow match {
 
     case TumblingGroupWindow(_, timeField, size)
         if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size)=>

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index b9b3e3e..c41c1a9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.codegen.AggregationCodeGenerator
 import org.apache.flink.table.plan.nodes.OverAggregate
 import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
 import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
 import org.apache.flink.table.runtime.aggregate._
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -214,7 +215,7 @@ class DataStreamOverAggregate(
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
-          .keyBy(partitionKeys: _*)
+          .keyBy(new CRowKeySelector(partitionKeys, inputSchema.projectedTypeInfo(partitionKeys)))
           .process(processFunction)
           .returns(returnTypeInfo)
           .name(aggOpName)
@@ -264,7 +265,7 @@ class DataStreamOverAggregate(
     // partitioned aggregation
       if (partitionKeys.nonEmpty) {
         inputDS
-          .keyBy(partitionKeys: _*)
+          .keyBy(new CRowKeySelector(partitionKeys, inputSchema.projectedTypeInfo(partitionKeys)))
           .process(processFunction)
           .returns(returnTypeInfo)
           .name(aggOpName)

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 3e23006..27f2c74 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, Ta
 import org.apache.flink.table.plan.nodes.CommonJoin
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.plan.util.UpdatingPlanChecker
+import org.apache.flink.table.runtime.CRowKeySelector
 import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, RowTimeBoundedStreamInnerJoin, WindowJoinUtil}
 import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -225,7 +226,9 @@ class DataStreamWindowJoin(
 
     if (!leftKeys.isEmpty) {
       leftDataStream.connect(rightDataStream)
-        .keyBy(leftKeys, rightKeys)
+        .keyBy(
+          new CRowKeySelector(leftKeys, leftSchema.projectedTypeInfo(leftKeys)),
+          new CRowKeySelector(rightKeys, rightSchema.projectedTypeInfo(rightKeys)))
         .process(procInnerJoinFunc)
         .name(operatorName)
         .returns(returnTypeInfo)
@@ -264,7 +267,9 @@ class DataStreamWindowJoin(
     if (!leftKeys.isEmpty) {
       leftDataStream
         .connect(rightDataStream)
-        .keyBy(leftKeys, rightKeys)
+        .keyBy(
+          new CRowKeySelector(leftKeys, leftSchema.projectedTypeInfo(leftKeys)),
+          new CRowKeySelector(rightKeys, rightSchema.projectedTypeInfo(rightKeys)))
         .transform(
           operatorName,
           returnTypeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
index ad0f552..cfe6683 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
@@ -62,4 +62,12 @@ class RowSchema(private val logicalRowType: RelDataType) {
     */
   def fieldNames: Seq[String] = logicalRowType.getFieldNames
 
+  /**
+    * Returns a projected [[TypeInformation]] of the schema.
+    */
+  def projectedTypeInfo(fields: Array[Int]): TypeInformation[Row] = {
+    val projectedTypes = fields.map(fieldTypeInfos(_))
+    val projectedNames = fields.map(fieldNames(_))
+    new RowTypeInfo(projectedTypes, projectedNames)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
new file mode 100644
index 0000000..216a7f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Null-aware key selector.
+  */
+class CRowKeySelector(
+    val keyFields: Array[Int],
+    @transient var returnType: TypeInformation[Row])
+  extends KeySelector[CRow, Row]
+  with ResultTypeQueryable[Row] {
+
+  override def getKey(value: CRow): Row = {
+    Row.project(value.row, keyFields)
+  }
+
+  override def getProducedType: TypeInformation[Row] = returnType
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index c84b254..2efd13d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -27,8 +27,7 @@ import org.apache.calcite.sql.fun._
 import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
 import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction, AggregateFunction => DataStreamAggFunction, _}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
@@ -981,7 +980,7 @@ object AggregateUtil {
       numAggregates: Int,
       finalRowArity: Int,
       properties: Seq[NamedWindowProperty]):
-    WindowFunction[Row, CRow, Tuple, DataStreamWindow] = {
+    WindowFunction[Row, CRow, Row, DataStreamWindow] = {
 
     if (isTimeWindow(window)) {
       val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
@@ -992,7 +991,7 @@ object AggregateUtil {
         endPos,
         timePos,
         finalRowArity)
-        .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]]
+        .asInstanceOf[WindowFunction[Row, CRow, Row, DataStreamWindow]]
     } else {
       new IncrementalAggregateWindowFunction(
         numGroupingKeys,

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index 1950230..69e4f7b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -59,7 +59,7 @@ class IncrementalAggregateTimeWindowFunction(
   }
 
   override def apply(
-      key: Tuple,
+      key: Row,
       window: TimeWindow,
       records: Iterable[Row],
       out: Collector[CRow]): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index 7e9d738..c9fa0c9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -19,12 +19,11 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 
 /**
@@ -38,7 +37,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
     private val numGroupingKey: Int,
     private val numAggregates: Int,
     private val finalRowArity: Int)
-  extends RichWindowFunction[Row, CRow, Tuple, W] {
+  extends RichWindowFunction[Row, CRow, Row, W] {
 
   private var output: CRow = _
 
@@ -51,7 +50,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
     * Row based on the mapping relation between intermediate aggregate data and output data.
     */
   override def apply(
-      key: Tuple,
+      key: Row,
       window: W,
       records: Iterable[Row],
       out: Collector[CRow]): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 119f92f..1d7bab6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -25,8 +25,9 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit._
@@ -66,7 +67,9 @@ class JoinITCase extends StreamingWithStateTestBase {
     data2.+=((2, 2L, "HeHe"))
 
     val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+      .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values
     val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+      .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values
 
     tEnv.registerTable("T1", t1)
     tEnv.registerTable("T2", t2)

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
index e67c784..67558d9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, Types}
+import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, DataViewTestAgg}
 import org.apache.flink.table.runtime.utils.{JavaUserDefinedAggFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.types.Row
@@ -39,7 +40,6 @@ class AggregateITCase extends StreamingWithStateTestBase {
   private val queryConfig = new StreamQueryConfig()
   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
 
-
   @Test
   def testDistinct(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -48,13 +48,13 @@ class AggregateITCase extends StreamingWithStateTestBase {
     StreamITCase.clear
 
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('b).distinct()
+      .select('b, Null(Types.LONG)).distinct()
 
     val results = t.toRetractStream[Row](queryConfig)
     results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
+    val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null", "6,null")
     assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
index f6e739e..a9d4e44 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
@@ -61,7 +61,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
     (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
     (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
     (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
-    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"),
+    (32L, 4, 4d, 4f, new BigDecimal("4"), null.asInstanceOf[String]))
 
   @Test
   def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
@@ -232,8 +233,6 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
-
-
   // ----------------------------------------------------------------------------------------------
   // Sliding windows
   // ----------------------------------------------------------------------------------------------
@@ -270,7 +269,10 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
       "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
       "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
       "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
-      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033",
+      "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035",
+      "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -308,7 +310,9 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
       "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
       "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
       "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+      "null,1,1970-01-01 00:00:00.025,1970-01-01 00:00:00.035",
+      "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.04")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -343,7 +347,9 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
       "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
       "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
       "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "null,1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033",
+      "null,1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -373,7 +379,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
     val expected = Seq(
       "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
       "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -402,7 +409,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
 
     val expected = Seq(
       "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+      "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.033")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -430,7 +438,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
     env.execute()
     val expected = Seq(
       "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+      "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.033")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
index 3e6b0c6..7563dab 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
@@ -52,7 +52,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       (7L, 7, "Hello World"),
       (8L, 8, "Hello World"),
       (8L, 8, "Hello World"),
-      (20L, 20, "Hello World"))
+      (20L, 20, "Hello World"),
+      (20L, 20, null.asInstanceOf[String]))
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(1)
@@ -80,7 +81,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
 
     val expected = Seq(
       "Hello World,1,7,1", "Hello World,2,7,2", "Hello World,3,7,2", "Hello World,4,13,3",
-      "Hello,1,1,1", "Hello,2,1,2", "Hello,3,2,3", "Hello,4,3,4", "Hello,5,3,5", "Hello,6,4,6")
+      "Hello,1,1,1", "Hello,2,1,2", "Hello,3,2,3", "Hello,4,3,4", "Hello,5,3,5", "Hello,6,4,6",
+      "null,1,20,1")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }