You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/10/17 08:37:05 UTC
flink git commit: [FLINK-7426] [table] Support null values in keys
Repository: flink
Updated Branches:
refs/heads/master e79cedf23 -> 14bc62740
[FLINK-7426] [table] Support null values in keys
This closes #4732.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14bc6274
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14bc6274
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14bc6274
Branch: refs/heads/master
Commit: 14bc62740e90ecefd34f9202f4a37c883c3122e5
Parents: e79cedf
Author: twalthr <tw...@apache.org>
Authored: Wed Sep 27 13:05:44 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Tue Oct 17 10:35:53 2017 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/types/Row.java | 22 +++++++++--
.../java/org/apache/flink/types/RowTest.java | 33 ++++++++++++++++
.../flink/table/calcite/FlinkTypeFactory.scala | 14 +++----
.../datastream/DataStreamGroupAggregate.scala | 3 +-
.../DataStreamGroupWindowAggregate.scala | 18 +++++----
.../datastream/DataStreamOverAggregate.scala | 5 ++-
.../nodes/datastream/DataStreamWindowJoin.scala | 9 ++++-
.../flink/table/plan/schema/RowSchema.scala | 8 ++++
.../flink/table/runtime/CRowKeySelector.scala | 41 ++++++++++++++++++++
.../table/runtime/aggregate/AggregateUtil.scala | 7 ++--
...IncrementalAggregateTimeWindowFunction.scala | 2 +-
.../IncrementalAggregateWindowFunction.scala | 7 ++--
.../table/runtime/stream/sql/JoinITCase.scala | 5 ++-
.../runtime/stream/table/AggregateITCase.scala | 8 ++--
.../stream/table/GroupWindowITCase.scala | 27 ++++++++-----
.../runtime/stream/table/OverWindowITCase.scala | 6 ++-
16 files changed, 166 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-core/src/main/java/org/apache/flink/types/Row.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java
index 0b2120f..4783314 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Row.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Row.java
@@ -142,15 +142,29 @@ public class Row implements Serializable{
/**
* Creates a new Row which copied from another row.
+ * This method does not perform a deep copy.
*
* @param row The row being copied.
* @return The cloned new Row
*/
public static Row copy(Row row) {
- Row ret = new Row(row.getArity());
- for (int i = 0; i < row.getArity(); ++i) {
- ret.setField(i, row.getField(i));
+ final Row newRow = new Row(row.fields.length);
+ System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length);
+ return newRow;
+ }
+
+ /**
+ * Creates a new Row with projected fields from another row.
+ * This method does not perform a deep copy.
+ *
+ * @param fields fields to be projected
+ * @return the new projected Row
+ */
+ public static Row project(Row row, int[] fields) {
+ final Row newRow = new Row(fields.length);
+ for (int i = 0; i < fields.length; i++) {
+ newRow.fields[i] = row.fields[fields[i]];
}
- return ret;
+ return newRow;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-core/src/test/java/org/apache/flink/types/RowTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
index 13a4d6a..067992a 100644
--- a/flink-core/src/test/java/org/apache/flink/types/RowTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class RowTest {
@Test
@@ -46,4 +47,36 @@ public class RowTest {
row2.setField(4, true);
assertEquals(row1, row2);
}
+
+ @Test
+ public void testRowCopy() {
+ Row row = new Row(5);
+ row.setField(0, 1);
+ row.setField(1, "hello");
+ row.setField(2, null);
+ row.setField(3, new Tuple2<>(2, "hi"));
+ row.setField(4, "hello world");
+
+ Row copy = Row.copy(row);
+ assertEquals(row, copy);
+ assertTrue(row != copy);
+ }
+
+ @Test
+ public void testRowProject() {
+ Row row = new Row(5);
+ row.setField(0, 1);
+ row.setField(1, "hello");
+ row.setField(2, null);
+ row.setField(3, new Tuple2<>(2, "hi"));
+ row.setField(4, "hello world");
+
+ Row projected = Row.project(row, new int[]{0, 2, 4});
+
+ Row expected = new Row(3);
+ expected.setField(0, 1);
+ expected.setField(1, null);
+ expected.setField(2, "hello world");
+ assertEquals(expected, projected);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 768d700..2874e61 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -149,13 +149,6 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true),
isNullable)
- case mp: MapTypeInfo[_, _] =>
- new MapRelDataType(
- mp,
- createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
- createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
- isNullable)
-
case mts: MultisetTypeInfo[_] =>
new MultisetRelDataType(
mts,
@@ -163,6 +156,13 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
isNullable
)
+ case mp: MapTypeInfo[_, _] =>
+ new MapRelDataType(
+ mp,
+ createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true),
+ createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true),
+ isNullable)
+
case ti: TypeInformation[_] =>
new GenericRelDataType(
ti,
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
index 742a7e4..71de57c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala
@@ -27,6 +27,7 @@ import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.plan.nodes.CommonAggregate
import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -144,7 +145,7 @@ class DataStreamGroupAggregate(
// grouped / keyed aggregation
if (groupings.nonEmpty) {
inputDS
- .keyBy(groupings: _*)
+ .keyBy(new CRowKeySelector(groupings, inputSchema.projectedTypeInfo(groupings)))
.process(processFunction)
.returns(outRowType)
.name(keyedAggOpName)
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index db15839..267bc3b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -22,7 +22,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger
@@ -34,16 +33,17 @@ import org.apache.flink.table.expressions.ExpressionUtils._
import org.apache.flink.table.expressions.ResolvedFieldReference
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.plan.nodes.CommonAggregate
-import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._
import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
-import org.apache.flink.table.runtime.RowtimeProcessFunction
+import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.runtime.aggregate.AggregateUtil._
import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.table.runtime.{CRowKeySelector, RowtimeProcessFunction}
import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger
import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
class DataStreamGroupWindowAggregate(
window: LogicalWindow,
@@ -189,10 +189,12 @@ class DataStreamGroupWindowAggregate(
schema.arity,
namedProperties)
- val keyedStream = timestampedInput.keyBy(grouping: _*)
+ val keySelector = new CRowKeySelector(grouping, inputSchema.projectedTypeInfo(grouping))
+
+ val keyedStream = timestampedInput.keyBy(keySelector)
val windowedStream =
createKeyedWindowedStream(queryConfig, window, keyedStream)
- .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]]
+ .asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]]
val (aggFunction, accumulatorRowType, aggResultRowType) =
AggregateUtil.createDataStreamAggregateFunction(
@@ -241,8 +243,8 @@ object DataStreamGroupWindowAggregate {
private def createKeyedWindowedStream(
queryConfig: StreamQueryConfig,
groupWindow: LogicalWindow,
- stream: KeyedStream[CRow, Tuple]):
- WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match {
+ stream: KeyedStream[CRow, Row]):
+ WindowedStream[CRow, Row, _ <: DataStreamWindow] = groupWindow match {
case TumblingGroupWindow(_, timeField, size)
if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size)=>
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index b9b3e3e..c41c1a9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.codegen.AggregationCodeGenerator
import org.apache.flink.table.plan.nodes.OverAggregate
import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules
import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.runtime.CRowKeySelector
import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.table.runtime.aggregate._
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -214,7 +215,7 @@ class DataStreamOverAggregate(
// partitioned aggregation
if (partitionKeys.nonEmpty) {
inputDS
- .keyBy(partitionKeys: _*)
+ .keyBy(new CRowKeySelector(partitionKeys, inputSchema.projectedTypeInfo(partitionKeys)))
.process(processFunction)
.returns(returnTypeInfo)
.name(aggOpName)
@@ -264,7 +265,7 @@ class DataStreamOverAggregate(
// partitioned aggregation
if (partitionKeys.nonEmpty) {
inputDS
- .keyBy(partitionKeys: _*)
+ .keyBy(new CRowKeySelector(partitionKeys, inputSchema.projectedTypeInfo(partitionKeys)))
.process(processFunction)
.returns(returnTypeInfo)
.name(aggOpName)
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
index 3e23006..27f2c74 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala
@@ -32,6 +32,7 @@ import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, Ta
import org.apache.flink.table.plan.nodes.CommonJoin
import org.apache.flink.table.plan.schema.RowSchema
import org.apache.flink.table.plan.util.UpdatingPlanChecker
+import org.apache.flink.table.runtime.CRowKeySelector
import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, RowTimeBoundedStreamInnerJoin, WindowJoinUtil}
import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay
import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
@@ -225,7 +226,9 @@ class DataStreamWindowJoin(
if (!leftKeys.isEmpty) {
leftDataStream.connect(rightDataStream)
- .keyBy(leftKeys, rightKeys)
+ .keyBy(
+ new CRowKeySelector(leftKeys, leftSchema.projectedTypeInfo(leftKeys)),
+ new CRowKeySelector(rightKeys, rightSchema.projectedTypeInfo(rightKeys)))
.process(procInnerJoinFunc)
.name(operatorName)
.returns(returnTypeInfo)
@@ -264,7 +267,9 @@ class DataStreamWindowJoin(
if (!leftKeys.isEmpty) {
leftDataStream
.connect(rightDataStream)
- .keyBy(leftKeys, rightKeys)
+ .keyBy(
+ new CRowKeySelector(leftKeys, leftSchema.projectedTypeInfo(leftKeys)),
+ new CRowKeySelector(rightKeys, rightSchema.projectedTypeInfo(rightKeys)))
.transform(
operatorName,
returnTypeInfo,
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
index ad0f552..cfe6683 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala
@@ -62,4 +62,12 @@ class RowSchema(private val logicalRowType: RelDataType) {
*/
def fieldNames: Seq[String] = logicalRowType.getFieldNames
+ /**
+ * Returns a projected [[TypeInformation]] of the schema.
+ */
+ def projectedTypeInfo(fields: Array[Int]): TypeInformation[Row] = {
+ val projectedTypes = fields.map(fieldTypeInfos(_))
+ val projectedNames = fields.map(fieldNames(_))
+ new RowTypeInfo(projectedTypes, projectedNames)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
new file mode 100644
index 0000000..216a7f9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+ * Null-aware key selector.
+ */
+class CRowKeySelector(
+ val keyFields: Array[Int],
+ @transient var returnType: TypeInformation[Row])
+ extends KeySelector[CRow, Row]
+ with ResultTypeQueryable[Row] {
+
+ override def getKey(value: CRow): Row = {
+ Row.project(value.row, keyFields)
+ }
+
+ override def getProducedType: TypeInformation[Row] = returnType
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index c84b254..2efd13d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -27,8 +27,7 @@ import org.apache.calcite.sql.fun._
import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction, AggregateFunction => DataStreamAggFunction, _}
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
@@ -981,7 +980,7 @@ object AggregateUtil {
numAggregates: Int,
finalRowArity: Int,
properties: Seq[NamedWindowProperty]):
- WindowFunction[Row, CRow, Tuple, DataStreamWindow] = {
+ WindowFunction[Row, CRow, Row, DataStreamWindow] = {
if (isTimeWindow(window)) {
val (startPos, endPos, timePos) = computeWindowPropertyPos(properties)
@@ -992,7 +991,7 @@ object AggregateUtil {
endPos,
timePos,
finalRowArity)
- .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]]
+ .asInstanceOf[WindowFunction[Row, CRow, Row, DataStreamWindow]]
} else {
new IncrementalAggregateWindowFunction(
numGroupingKeys,
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
index 1950230..69e4f7b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
@@ -59,7 +59,7 @@ class IncrementalAggregateTimeWindowFunction(
}
override def apply(
- key: Tuple,
+ key: Row,
window: TimeWindow,
records: Iterable[Row],
out: Collector[CRow]): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
index 7e9d738..c9fa0c9 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala
@@ -19,12 +19,11 @@ package org.apache.flink.table.runtime.aggregate
import java.lang.Iterable
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.types.Row
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
import org.apache.flink.util.Collector
/**
@@ -38,7 +37,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
private val numGroupingKey: Int,
private val numAggregates: Int,
private val finalRowArity: Int)
- extends RichWindowFunction[Row, CRow, Tuple, W] {
+ extends RichWindowFunction[Row, CRow, Row, W] {
private var output: CRow = _
@@ -51,7 +50,7 @@ class IncrementalAggregateWindowFunction[W <: Window](
* Row based on the mapping relation between intermediate aggregate data and output data.
*/
override def apply(
- key: Tuple,
+ key: Row,
window: W,
records: Iterable[Row],
out: Collector[CRow]): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
index 119f92f..1d7bab6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
@@ -25,8 +25,9 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.expressions.Null
import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.junit._
@@ -66,7 +67,9 @@ class JoinITCase extends StreamingWithStateTestBase {
data2.+=((2, 2L, "HeHe"))
val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values
val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+ .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values
tEnv.registerTable("T1", t1)
tEnv.registerTable("T2", t2)
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
index e67c784..67558d9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, Types}
+import org.apache.flink.table.expressions.Null
import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, DataViewTestAgg}
import org.apache.flink.table.runtime.utils.{JavaUserDefinedAggFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
@@ -39,7 +40,6 @@ class AggregateITCase extends StreamingWithStateTestBase {
private val queryConfig = new StreamQueryConfig()
queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-
@Test
def testDistinct(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -48,13 +48,13 @@ class AggregateITCase extends StreamingWithStateTestBase {
StreamITCase.clear
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('b).distinct()
+ .select('b, Null(Types.LONG)).distinct()
val results = t.toRetractStream[Row](queryConfig)
results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
- val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
+ val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null", "6,null")
assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
index f6e739e..a9d4e44 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala
@@ -61,7 +61,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
(4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
(7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
(8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
- (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+ (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"),
+ (32L, 4, 4d, 4f, new BigDecimal("4"), null.asInstanceOf[String]))
@Test
def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
@@ -232,8 +233,6 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
-
-
// ----------------------------------------------------------------------------------------------
// Sliding windows
// ----------------------------------------------------------------------------------------------
@@ -270,7 +269,10 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
"2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
"3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
"3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
- "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033",
+ "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035",
+ "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -308,7 +310,9 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
"Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
"Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
"Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+ "null,1,1970-01-01 00:00:00.025,1970-01-01 00:00:00.035",
+ "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.04")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -343,7 +347,9 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
"Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
"Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
"Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "null,1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033",
+ "null,1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -373,7 +379,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
val expected = Seq(
"Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
"Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -402,7 +409,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
val expected = Seq(
"Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+ "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.033")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
@@ -430,7 +438,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
env.execute()
val expected = Seq(
"Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
- "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+ "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.033")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
index 3e6b0c6..7563dab 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
@@ -52,7 +52,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
(7L, 7, "Hello World"),
(8L, 8, "Hello World"),
(8L, 8, "Hello World"),
- (20L, 20, "Hello World"))
+ (20L, 20, "Hello World"),
+ (20L, 20, null.asInstanceOf[String]))
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
@@ -80,7 +81,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
val expected = Seq(
"Hello World,1,7,1", "Hello World,2,7,2", "Hello World,3,7,2", "Hello World,4,13,3",
- "Hello,1,1,1", "Hello,2,1,2", "Hello,3,2,3", "Hello,4,3,4", "Hello,5,3,5", "Hello,6,4,6")
+ "Hello,1,1,1", "Hello,2,1,2", "Hello,3,2,3", "Hello,4,3,4", "Hello,5,3,5", "Hello,6,4,6",
+ "null,1,20,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}