You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/07 12:18:28 UTC
[1/3] flink git commit: [FLINK-6033] [table] Add support for SQL
UNNEST.
Repository: flink
Updated Branches:
refs/heads/master e5b65a7fc -> fe4e96a72
[FLINK-6033] [table] Add support for SQL UNNEST.
This closes #3793.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4e96a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4e96a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4e96a7
Branch: refs/heads/master
Commit: fe4e96a726dd32fb948db050b975312e120e2461
Parents: 9f2293c
Author: Shuyi Chen <sh...@uber.com>
Authored: Fri Apr 21 23:48:28 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sun May 7 13:32:26 2017 +0200
----------------------------------------------------------------------
docs/dev/table_api.md | 2 +
.../flink/table/calcite/FlinkTypeFactory.scala | 5 +-
.../utils/UserDefinedFunctionUtils.scala | 6 +-
.../flink/table/plan/nodes/FlinkRelNode.scala | 3 +-
.../flink/table/plan/rules/FlinkRuleSets.scala | 3 +
.../plan/rules/logical/LogicalUnnestRule.scala | 134 +++++++++++++++++++
.../table/plan/util/ExplodeFunctionUtil.scala | 91 +++++++++++++
.../flink/table/typeutils/TypeCheckUtils.scala | 4 +-
.../table/api/scala/batch/sql/JoinITCase.scala | 54 ++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 90 +++++++++++++
10 files changed, 388 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index a77d994..d105188 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1482,6 +1482,7 @@ val result2 = tableEnv.sql(
#### Limitations
Joins, set operations, and non-windowed aggregations are not supported yet.
+`UNNEST` supports only arrays and does not support `WITH ORDINALITY` yet.
{% top %}
@@ -1690,6 +1691,7 @@ tableReference:
tablePrimary:
[ TABLE ] [ [ catalogName . ] schemaName . ] tableName
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
+ | UNNEST '(' expression ')'
values:
VALUES expression [, expression ]*
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 9281ad8..eba1623 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, RowTypeInfo}
@@ -180,6 +180,9 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
case pa: PrimitiveArrayTypeInfo[_] =>
new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false)
+ case ba: BasicArrayTypeInfo[_, _] =>
+ new ArrayRelDataType(ba, createTypeFromTypeInfo(ba.getComponentInfo), true)
+
case oa: ObjectArrayTypeInfo[_, _] =>
new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true)
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 689bf0e..11174de 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -436,7 +436,11 @@ object UserDefinedFunctionUtils {
expected.isPrimitive && Primitives.wrap(expected) == candidate ||
candidate == classOf[Date] && (expected == classOf[Int] || expected == classOf[JInt]) ||
candidate == classOf[Time] && (expected == classOf[Int] || expected == classOf[JInt]) ||
- candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong])
+ candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong]) ||
+ (candidate.isArray &&
+ expected.isArray &&
+ candidate.getComponentType.isInstanceOf[Object] &&
+ expected.getComponentType == classOf[Object])
@throws[Exception]
def serialize(function: UserDefinedFunction): String = {
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
index 0b244e9..8509a8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRelNode.scala
@@ -61,7 +61,8 @@ trait FlinkRelNode extends RelNode {
val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable)
val field = fa.getField.getName
s"$referenceExpr.$field"
-
+ case cv: RexCorrelVariable =>
+ cv.toString
case _ =>
throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index fad60fd..980dfd3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -100,6 +100,9 @@ object FlinkRuleSets {
PushProjectIntoTableSourceScanRule.INSTANCE,
PushFilterIntoTableSourceScanRule.INSTANCE,
+ // Unnest rule
+ LogicalUnnestRule.INSTANCE,
+
// translate to flink logical rel nodes
FlinkLogicalAggregate.CONVERTER,
FlinkLogicalWindowAggregate.CONVERTER,
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
new file mode 100644
index 0000000..f2d9f2a
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalUnnestRule.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import java.util.Collections
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.plan.RelOptRule._
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.`type`.{RelDataTypeFieldImpl, RelRecordType, StructKind}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.Uncollect
+import org.apache.calcite.rel.logical._
+import org.apache.calcite.sql.`type`.AbstractSqlType
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+import org.apache.flink.table.plan.schema.ArrayRelDataType
+import org.apache.flink.table.plan.util.ExplodeFunctionUtil
+
+class LogicalUnnestRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends RelOptRule(operand, description) {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+
+ val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
+ val right = join.getRight.asInstanceOf[RelSubset].getOriginal
+
+ right match {
+ // a filter is pushed above the table function
+ case filter: LogicalFilter =>
+ filter.getInput.asInstanceOf[RelSubset].getOriginal match {
+ case u: Uncollect => !u.withOrdinality
+ case _ => false
+ }
+ case u: Uncollect => !u.withOrdinality
+ case _ => false
+ }
+ }
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+ val correlate = call.rel(0).asInstanceOf[LogicalCorrelate]
+
+ val outer = correlate.getLeft.asInstanceOf[RelSubset].getOriginal
+ val array = correlate.getRight.asInstanceOf[RelSubset].getOriginal
+
+ def convert(relNode: RelNode): RelNode = {
+ relNode match {
+ case rs: RelSubset =>
+ convert(rs.getRelList.get(0))
+
+ case f: LogicalFilter =>
+ f.copy(
+ f.getTraitSet,
+ ImmutableList.of(convert(f.getInput.asInstanceOf[RelSubset].getOriginal)))
+
+ case uc: Uncollect =>
+ // convert Uncollect into TableFunctionScan
+ val cluster = correlate.getCluster
+
+ val arrayType =
+ uc.getInput.getRowType.getFieldList.get(0).getValue.asInstanceOf[ArrayRelDataType]
+ val componentType = arrayType.getComponentType
+
+ // create table function
+ val explodeTableFunc = UserDefinedFunctionUtils.createTableSqlFunctions(
+ "explode",
+ ExplodeFunctionUtil.explodeTableFuncFromType(arrayType.typeInfo),
+ FlinkTypeFactory.toTypeInfo(arrayType.getComponentType),
+ cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]).head
+
+ // create table function call
+ val rexCall = cluster.getRexBuilder.makeCall(
+ explodeTableFunc,
+ uc.getInput.asInstanceOf[RelSubset]
+ .getOriginal.asInstanceOf[LogicalProject].getChildExps
+ )
+
+ // determine rel data type of unnest
+ val rowType = componentType match {
+ case _: AbstractSqlType =>
+ new RelRecordType(
+ StructKind.FULLY_QUALIFIED,
+ ImmutableList.of(new RelDataTypeFieldImpl("f0", 0, componentType)))
+ case _: RelRecordType => componentType
+ case _ => throw TableException(
+ s"Unsupported array component type in UNNEST: ${componentType.toString}")
+ }
+
+ // create table function scan
+ new LogicalTableFunctionScan(
+ cluster,
+ correlate.getTraitSet,
+ Collections.emptyList(),
+ rexCall,
+ classOf[Array[Object]],
+ rowType,
+ null)
+ }
+ }
+
+ // convert unnest into table function scan
+ val tableFunctionScan = convert(array)
+ // create correlate with table function scan as input
+ val newCorrleate =
+ correlate.copy(correlate.getTraitSet, ImmutableList.of(outer, tableFunctionScan))
+ call.transformTo(newCorrleate)
+ }
+}
+
+object LogicalUnnestRule {
+ val INSTANCE = new LogicalUnnestRule(
+ operand(classOf[LogicalCorrelate], any),
+ "LogicalUnnestRule")
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
new file mode 100644
index 0000000..1bcc6d9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.util
+
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.functions.TableFunction
+
+class ObjectExplodeTableFunc extends TableFunction[Object] {
+ def eval(arr: Array[Object]): Unit = {
+ arr.foreach(collect)
+ }
+}
+
+class FloatExplodeTableFunc extends TableFunction[Float] {
+ def eval(arr: Array[Float]): Unit = {
+ arr.foreach(collect)
+ }
+}
+
+class ShortExplodeTableFunc extends TableFunction[Short] {
+ def eval(arr: Array[Short]): Unit = {
+ arr.foreach(collect)
+ }
+}
+class IntExplodeTableFunc extends TableFunction[Int] {
+ def eval(arr: Array[Int]): Unit = {
+ arr.foreach(collect)
+ }
+}
+
+class LongExplodeTableFunc extends TableFunction[Long] {
+ def eval(arr: Array[Long]): Unit = {
+ arr.foreach(collect)
+ }
+}
+
+class DoubleExplodeTableFunc extends TableFunction[Double] {
+ def eval(arr: Array[Double]): Unit = {
+ arr.foreach(collect)
+ }
+}
+
+class ByteExplodeTableFunc extends TableFunction[Byte] {
+ def eval(arr: Array[Byte]): Unit = {
+ arr.foreach(collect)
+ }
+}
+
+class BooleanExplodeTableFunc extends TableFunction[Boolean] {
+ def eval(arr: Array[Boolean]): Unit = {
+ arr.foreach(collect)
+ }
+}
+
+object ExplodeFunctionUtil {
+ def explodeTableFuncFromType(ti: TypeInformation[_]):TableFunction[_] = {
+ ti match {
+ case pat: PrimitiveArrayTypeInfo[_] => {
+ pat.getComponentType match {
+ case BasicTypeInfo.INT_TYPE_INFO => new IntExplodeTableFunc
+ case BasicTypeInfo.LONG_TYPE_INFO => new LongExplodeTableFunc
+ case BasicTypeInfo.SHORT_TYPE_INFO => new ShortExplodeTableFunc
+ case BasicTypeInfo.FLOAT_TYPE_INFO => new FloatExplodeTableFunc
+ case BasicTypeInfo.DOUBLE_TYPE_INFO => new DoubleExplodeTableFunc
+ case BasicTypeInfo.BYTE_TYPE_INFO => new ByteExplodeTableFunc
+ case BasicTypeInfo.BOOLEAN_TYPE_INFO => new BooleanExplodeTableFunc
+ }
+ }
+ case _: ObjectArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
+ case _: BasicArrayTypeInfo[_, _] => new ObjectExplodeTableFunc
+ case _ => throw new UnsupportedOperationException(ti.toString + "IS NOT supported")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 9896a8c..fea8c2a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -68,7 +68,9 @@ object TypeCheckUtils {
def isLong(dataType: TypeInformation[_]): Boolean = dataType == LONG_TYPE_INFO
def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => true
+ case _: ObjectArrayTypeInfo[_, _] |
+ _: PrimitiveArrayTypeInfo[_] |
+ _: BasicArrayTypeInfo[_, _] => true
case _ => false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
index 9df17ad..8a8c0ce 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@@ -375,6 +376,59 @@ class JoinITCase(
Assert.assertEquals(0, result)
}
+ @Test
+ def testCrossWithUnnest(): Unit = {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ (1, 1L, Array("Hi", "w")),
+ (2, 2L, Array("Hello", "k")),
+ (3, 2L, Array("Hello world", "x"))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataSet("T", stream, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) as A (s)"
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = List("1,Hi", "1,w", "2,Hello", "2,k", "3,Hello world", "3,x")
+ val results = result.toDataSet[Row].collect().toList
+ assertEquals(expected.toString(), results.sortWith(_.toString < _.toString).toString())
+ }
+
+ @Test
+ def testJoinWithUnnestOfTuple(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List(
+ (1, Array((12, "45.6"), (2, "45.612"))),
+ (2, Array((13, "41.6"), (1, "45.2136"))),
+ (3, Array((18, "42.6")))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataSet("T", stream, 'a, 'b)
+
+ val sqlQuery = "" +
+ "SELECT a, b, x, y " +
+ "FROM " +
+ " (SELECT a, b FROM T WHERE a < 3) as tf, " +
+ " UNNEST(tf.b) as A (x, y) " +
+ "WHERE x > a"
+
+ val result = tEnv.sql(sqlQuery)
+
+ val expected = List(
+ "1,[(12,45.6), (2,45.612)],12,45.6",
+ "1,[(12,45.6), (2,45.612)],2,45.612",
+ "2,[(13,41.6), (1,45.2136)],13,41.6").mkString(", ")
+ val results = result.toDataSet[Row].collect().map(_.toString)
+ assertEquals(expected, results.sorted.mkString(", "))
+ }
+
@Test(expected = classOf[TableException])
def testRightOuterJoinWithNonEquiJoinPredicate(): Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/fe4e96a7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 95366e1..4147358 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -190,4 +190,94 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testUnnestPrimitiveArrayFromTable(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = List(
+ (1, Array(12, 45), Array(Array(12, 45))),
+ (2, Array(41, 5), Array(Array(18), Array(87))),
+ (3, Array(18, 42), Array(Array(1), Array(45)))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "1,[12, 45],12",
+ "1,[12, 45],45",
+ "2,[41, 5],41",
+ "2,[41, 5],5",
+ "3,[18, 42],18",
+ "3,[18, 42],42"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnnestArrayOfArrayFromTable(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = List(
+ (1, Array(12, 45), Array(Array(12, 45))),
+ (2, Array(41, 5), Array(Array(18), Array(87))),
+ (3, Array(18, 42), Array(Array(1), Array(45)))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "1,[12, 45]",
+ "2,[18]",
+ "2,[87]",
+ "3,[1]",
+ "3,[45]")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnnestObjectArrayFromTableWithFilter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = List(
+ (1, Array((12, "45.6"), (12, "45.612"))),
+ (2, Array((13, "41.6"), (14, "45.2136"))),
+ (3, Array((18, "42.6")))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataStream("T", stream, 'a, 'b)
+
+ val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "2,[(13,41.6), (14,45.2136)],14,45.2136",
+ "3,[(18,42.6)],18,42.6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
}
+
[3/3] flink git commit: [FLINK-6257] [table] Refactor OVER window
tests.
Posted by fh...@apache.org.
[FLINK-6257] [table] Refactor OVER window tests.
This closes #3697.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f2293cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f2293cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f2293cf
Branch: refs/heads/master
Commit: 9f2293cfdab960246fe1aea1d705eea18a011761
Parents: e5b65a7
Author: sunjincheng121 <su...@gmail.com>
Authored: Fri Apr 7 19:28:19 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sun May 7 13:32:26 2017 +0200
----------------------------------------------------------------------
.../api/scala/stream/sql/OverWindowITCase.scala | 878 ++++++++++++++++
.../api/scala/stream/sql/OverWindowTest.scala | 503 ++++++++++
.../table/api/scala/stream/sql/SqlITCase.scala | 992 +------------------
.../scala/stream/sql/WindowAggregateTest.scala | 380 +------
.../table/runtime/harness/HarnessTestBase.scala | 87 ++
.../runtime/harness/OverWindowHarnessTest.scala | 974 ++++++++++++++++++
6 files changed, 2444 insertions(+), 1370 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
new file mode 100644
index 0000000..a7fe1c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+
+import scala.collection.mutable
+
+class OverWindowITCase extends StreamingWithStateTestBase {
+
+ val data = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (3L, 3, "Hello"),
+ (4L, 4, "Hello"),
+ (5L, 5, "Hello"),
+ (6L, 6, "Hello"),
+ (7L, 7, "Hello World"),
+ (8L, 8, "Hello World"),
+ (20L, 20, "Hello World"))
+
+ /**
+ * All aggregates must be computed on the same window.
+ */
+ @Test(expected = classOf[TableException])
+ def testMultiWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+ }
+
+ @Test
+ def testProcTimeBoundedPartitionedRowsOver(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC, " +
+ " MIN(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+ "FROM MyTable"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "1,0,0",
+ "2,1,1",
+ "2,3,1",
+ "3,3,3",
+ "3,7,3",
+ "3,12,3",
+ "4,6,6",
+ "4,13,6",
+ "4,21,6",
+ "4,30,6",
+ "5,10,10",
+ "5,21,10",
+ "5,33,10",
+ "5,46,10",
+ "5,60,10")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " MIN(c) OVER (" +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+ "FROM MyTable"
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "1,0,0",
+ "2,1,0",
+ "2,3,0",
+ "3,6,0",
+ "3,10,0",
+ "3,15,0",
+ "4,21,0",
+ "4,28,0",
+ "4,36,0",
+ "4,45,0",
+ "5,55,0",
+ "5,66,1",
+ "5,77,2",
+ "5,88,3",
+ "5,99,4")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ // for sum aggregation ensure that every time the order of each element is consistent
+ env.setParallelism(1)
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
+ "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) " +
+ " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "Hello World,1", "Hello World,2", "Hello World,3",
+ "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ // for sum aggregation ensure that every time the order of each element is consistent
+ env.setParallelism(1)
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
+ "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+ val data = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6500L, (6L, 65, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((9500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 9, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ " c, b, " +
+ " COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+ " FROM T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+ "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+ "Hello,3,4,9",
+ "Hello,4,2,7",
+ "Hello,5,2,9",
+ "Hello,6,2,11", "Hello,65,2,12",
+ "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+ "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+ "Hello World,8,2,15",
+ "Hello World,20,1,20")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedPartitionedRowsOver(): Unit = {
+ val data = Seq(
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((3L, (7L, 7, "Hello World"))),
+ Left((1L, (7L, 7, "Hello World"))),
+ Left((1L, (7L, 7, "Hello World"))),
+ Right(2L),
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))),
+ Right(6L),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(20L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ " c, a, " +
+ " COUNT(a) " +
+ " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+ " SUM(a) " +
+ " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+ "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+ "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
+ "Hello,6,3,15",
+ "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
+ "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
+ val data = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6500L, (6L, 65, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((9500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 9, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ " c, b, " +
+ " COUNT(a) " +
+ " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " SUM(a) " +
+ " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
+ " FROM T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+ "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+ "Hello,3,4,9",
+ "Hello,4,2,7",
+ "Hello,5,2,9",
+ "Hello,6,2,11", "Hello,65,2,12",
+ "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+ "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+ "Hello World,8,2,15",
+ "Hello World,20,1,20")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+ val data = Seq(
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))), // early row
+ Right(3L),
+ Left((2L, (2L, 2, "Hello"))), // late row
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(7L),
+ Left((9L, (9L, 9, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Right(20L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, a, " +
+ " COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
+ " SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
+ "FROM T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+ "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+ "Hello,3,3,7",
+ "Hello,4,3,9", "Hello,5,3,12",
+ "Hello,6,3,15", "Hello World,7,3,18",
+ "Hello World,8,3,21", "Hello World,8,3,23",
+ "Hello World,9,3,25",
+ "Hello World,20,3,37")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT a, b, c, " +
+ " SUM(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L))
+
+ val t1 = env
+ .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "1,1,Hello,6,3,2,3,1",
+ "1,2,Hello,6,3,2,3,1",
+ "1,3,Hello world,6,3,2,3,1",
+ "1,1,Hi,7,4,1,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,6,3,2,3,1",
+ "2,3,Hello world,6,3,2,3,1",
+ "1,4,Hello world,11,5,2,4,1",
+ "1,5,Hello world,29,8,3,7,1",
+ "1,6,Hello world,29,8,3,7,1",
+ "1,7,Hello world,29,8,3,7,1",
+ "2,4,Hello world,15,5,3,5,1",
+ "2,5,Hello world,15,5,3,5,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Left(14000021L, (1, 6L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000024L, (3, 5L, "Hello world")),
+ Left(14000026L, (1, 7L, "Hello world")),
+ Left(14000025L, (1, 8L, "Hello world")),
+ Left(14000022L, (1, 9L, "Hello world")),
+ Right(14000030L))
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,Hello,2,1,2,2,2",
+ "1,3,Hello world,5,2,2,3,2",
+ "1,1,Hi,6,3,2,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,3,2,1,2,1",
+ "3,1,Hello,1,1,1,1,1",
+ "3,2,Hello world,3,2,1,2,1",
+ "1,5,Hello world,11,4,2,5,1",
+ "1,6,Hello world,17,5,3,6,1",
+ "1,9,Hello world,26,6,4,9,1",
+ "1,8,Hello world,34,7,4,9,1",
+ "1,7,Hello world,41,8,5,9,1",
+ "2,5,Hello world,8,3,2,5,1",
+ "3,5,Hello world,8,3,2,5,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT a, b, c, " +
+ " SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L))
+
+ val t1 = env
+ .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "2,1,Hello,1,1,1,1,1",
+ "1,1,Hello,7,4,1,3,1",
+ "1,2,Hello,7,4,1,3,1",
+ "1,3,Hello world,7,4,1,3,1",
+ "2,2,Hello world,12,6,2,3,1",
+ "2,3,Hello world,12,6,2,3,1",
+ "1,1,Hi,13,7,1,3,1",
+ "1,4,Hello world,17,8,2,4,1",
+ "1,5,Hello world,35,11,3,7,1",
+ "1,6,Hello world,35,11,3,7,1",
+ "1,7,Hello world,35,11,3,7,1",
+ "2,4,Hello world,44,13,3,7,1",
+ "2,5,Hello world,44,13,3,7,1")
+
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ " SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 2L, "Hello")),
+ Left(14000002L, (3, 5L, "Hello")),
+ Left(14000003L, (1, 3L, "Hello")),
+ Left(14000004L, (3, 7L, "Hello world")),
+ Left(14000007L, (4, 9L, "Hello world")),
+ Left(14000008L, (5, 8L, "Hello world")),
+ Right(14000010L),
+ // this element will be discard because it is late
+ Left(14000008L, (6, 8L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (6, 8L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env
+ .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello,2,1,2,2,2",
+ "3,5,Hello,7,2,3,5,2",
+ "1,3,Hello,10,3,3,5,2",
+ "3,7,Hello world,17,4,4,7,2",
+ "1,1,Hi,18,5,3,7,1",
+ "4,9,Hello world,27,6,4,9,1",
+ "5,8,Hello world,35,7,5,9,1",
+ "6,8,Hello world,43,8,5,9,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+// <<<<<<< HEAD
+
+
+ /** test sliding event-time unbounded window with partition by **/
+ @Test
+ def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ // the next 3 elements are late
+ Left(14000008L, (1, 4L, "Hello world")),
+ Left(14000008L, (2, 3L, "Hello world")),
+ Left(14000008L, (3, 3L, "Hello world")),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 6L, "Hello world")),
+ // the next 3 elements are late
+ Left(14000019L, (1, 6L, "Hello world")),
+ Left(14000018L, (2, 4L, "Hello world")),
+ Left(14000018L, (3, 4L, "Hello world")),
+ Left(14000022L, (2, 5L, "Hello world")),
+ Left(14000022L, (3, 5L, "Hello world")),
+ Left(14000024L, (1, 7L, "Hello world")),
+ Left(14000023L, (1, 8L, "Hello world")),
+ Left(14000021L, (1, 9L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = List(
+ "1,2,Hello,2,1,2,2,2",
+ "1,3,Hello world,5,2,2,3,2",
+ "1,1,Hi,6,3,2,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,3,2,1,2,1",
+ "3,1,Hello,1,1,1,1,1",
+ "3,2,Hello world,3,2,1,2,1",
+ "1,5,Hello world,11,4,2,5,1",
+ "1,6,Hello world,17,5,3,6,1",
+ "1,9,Hello world,26,6,4,9,1",
+ "1,8,Hello world,34,7,4,9,1",
+ "1,7,Hello world,41,8,5,9,1",
+ "2,5,Hello world,8,3,2,5,1",
+ "3,5,Hello world,8,3,2,5,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+}
+
+object OverWindowITCase {
+
+ class EventTimeSourceFunction[T](
+ dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
+ override def run(ctx: SourceContext[T]): Unit = {
+ dataWithTimestampList.foreach {
+ case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
+ case Right(w) => ctx.emitWatermark(new Watermark(w))
+ }
+ }
+
+ override def cancel(): Unit = ???
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
new file mode 100644
index 0000000..711b31b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class OverWindowTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)](
+ "MyTable",
+ 'a, 'b, 'c,
+ 'proctime.proctime, 'rowtime.rowtime)
+
+ @Test
+ def testProcTimeBoundedPartitionedRowsOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "proctime")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "proctime"),
+ term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testProcTimeBoundedPartitionedRangeOver() = {
+
+ val sqlQuery =
+ "SELECT a, " +
+ " AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
+ " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
+ "FROM MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "proctime")
+ ),
+ term("partitionBy", "a"),
+ term("orderBy", "proctime"),
+ term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+ term(
+ "select",
+ "a",
+ "c",
+ "proctime",
+ "COUNT(c) AS w0$o0",
+ "$SUM0(c) AS w0$o1"
+ )
+ ),
+ term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testProcTimeBoundedNonPartitionedRangeOver() = {
+
+ val sqlQuery =
+ "SELECT a, " +
+ " COUNT(c) OVER (ORDER BY proctime " +
+ " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
+ "FROM MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "proctime")
+ ),
+ term("orderBy", "proctime"),
+ term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
+ ),
+ term("select", "a", "w0$o0 AS $1")
+ )
+
+ streamUtil.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testProcTimeBoundedNonPartitionedRowsOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "proctime")
+ ),
+ term("orderBy", "proctime"),
+ term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+
+ @Test
+ def testProcTimeUnboundedPartitionedRangeOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "proctime")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "proctime"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testProcTimeUnboundedPartitionedRowsOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ streamTableNode(0),
+ term("partitionBy", "c"),
+ term("orderBy", "proctime"),
+ term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testProcTimeUnboundedNonPartitionedRangeOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "proctime")
+ ),
+ term("orderBy", "proctime"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testProcTimeUnboundedNonPartitionedRowsOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ streamTableNode(0),
+ term("orderBy", "proctime"),
+ term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testRowTimeBoundedPartitionedRowsOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "rowtime"),
+ term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testRowTimeBoundedPartitionedRangeOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
+ "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "rowtime"),
+ term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testRowTimeBoundedNonPartitionedRowsOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
+ "CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("orderBy", "rowtime"),
+ term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testRowTimeBoundedNonPartitionedRangeOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY rowtime " +
+ "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("orderBy", "rowtime"),
+ term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
+ ),
+ term("select", "c", "w0$o0 AS $1")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testRowTimeUnboundedPartitionedRangeOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "rowtime"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testRowTimeUnboundedPartitionedRowsOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("partitionBy", "c"),
+ term("orderBy", "rowtime"),
+ term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term(
+ "select",
+ "a",
+ "c",
+ "rowtime",
+ "COUNT(a) AS w0$o0",
+ "$SUM0(a) AS w0$o1"
+ )
+ ),
+ term(
+ "select",
+ "c",
+ "w0$o0 AS cnt1",
+ "CASE(>(w0$o0, 0)",
+ "CAST(w0$o1), null) AS cnt2"
+ )
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testRowTimeUnboundedNonPartitionedRangeOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("orderBy", "rowtime"),
+ term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ ),
+ term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ @Test
+ def testRowTimeUnboundedNonPartitionedRowsOver() = {
+ val sql = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY rowtime ROWS UNBOUNDED preceding) as cnt2 " +
+ "from MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "c", "rowtime")
+ ),
+ term("orderBy", "rowtime"),
+ term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
+ term(
+ "select",
+ "a",
+ "c",
+ "rowtime",
+ "COUNT(a) AS w0$o0",
+ "$SUM0(a) AS w0$o1"
+ )
+ ),
+ term(
+ "select",
+ "c",
+ "w0$o0 AS cnt1",
+ "CASE(>(w0$o0, 0)",
+ "CAST(w0$o1), null) AS cnt2"
+ )
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 249d505..95366e1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,34 +19,16 @@
package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-
-import scala.collection.mutable
class SqlITCase extends StreamingWithStateTestBase {
- val data = List(
- (1L, 1, "Hello"),
- (2L, 2, "Hello"),
- (3L, 3, "Hello"),
- (4L, 4, "Hello"),
- (5L, 5, "Hello"),
- (6L, 6, "Hello"),
- (7L, 7, "Hello World"),
- (8L, 8, "Hello World"),
- (20L, 20, "Hello World"))
-
/** test unbounded groupby (without window) **/
@Test
def testUnboundedGroupby(): Unit = {
@@ -208,976 +190,4 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
- @Test
- def testUnboundPartitionedProcessingWindowWithRange(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- // for sum aggregation ensure that every time the order of each element is consistent
- env.setParallelism(1)
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
- "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUnboundPartitionedProcessingWindowWithRow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
- "CURRENT ROW)" +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "Hello World,1", "Hello World,2", "Hello World,3",
- "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUnboundNonPartitionedProcessingWindowWithRange(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- // for sum aggregation ensure that every time the order of each element is consistent
- env.setParallelism(1)
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
- "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUnboundNonPartitionedProcessingWindowWithRow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
- val data = Seq(
- Left((1L, (1L, 1, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((3L, (7L, 7, "Hello World"))),
- Left((1L, (7L, 7, "Hello World"))),
- Left((1L, (7L, 7, "Hello World"))),
- Right(2L),
- Left((3L, (3L, 3, "Hello"))),
- Left((4L, (4L, 4, "Hello"))),
- Left((5L, (5L, 5, "Hello"))),
- Left((6L, (6L, 6, "Hello"))),
- Left((20L, (20L, 20, "Hello World"))),
- Right(6L),
- Left((8L, (8L, 8, "Hello World"))),
- Left((7L, (7L, 7, "Hello World"))),
- Right(20L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env
- .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, a, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
- ", sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
- " from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
- "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
- "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
- "Hello,6,3,15",
- "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
- "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testBoundNonPartitionedEventTimeWindowWithRow(): Unit = {
-
- val data = Seq(
- Left((2L, (2L, 2, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((20L, (20L, 20, "Hello World"))), // early row
- Right(3L),
- Left((2L, (2L, 2, "Hello"))), // late row
- Left((3L, (3L, 3, "Hello"))),
- Left((4L, (4L, 4, "Hello"))),
- Left((5L, (5L, 5, "Hello"))),
- Left((6L, (6L, 6, "Hello"))),
- Left((7L, (7L, 7, "Hello World"))),
- Right(7L),
- Left((9L, (9L, 9, "Hello World"))),
- Left((8L, (8L, 8, "Hello World"))),
- Left((8L, (8L, 8, "Hello World"))),
- Right(20L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- env.setParallelism(1)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env
- .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, a, " +
- "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
- "sum(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
- "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
- "Hello,3,3,7",
- "Hello,4,3,9", "Hello,5,3,12",
- "Hello,6,3,15", "Hello World,7,3,18",
- "Hello World,8,3,21", "Hello World,8,3,23",
- "Hello World,9,3,25",
- "Hello World,20,3,37")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
- val data = Seq(
- Left((1500L, (1L, 15, "Hello"))),
- Left((1600L, (1L, 16, "Hello"))),
- Left((1000L, (1L, 1, "Hello"))),
- Left((2000L, (2L, 2, "Hello"))),
- Right(1000L),
- Left((2000L, (2L, 2, "Hello"))),
- Left((2000L, (2L, 3, "Hello"))),
- Left((3000L, (3L, 3, "Hello"))),
- Right(2000L),
- Left((4000L, (4L, 4, "Hello"))),
- Right(3000L),
- Left((5000L, (5L, 5, "Hello"))),
- Right(5000L),
- Left((6000L, (6L, 6, "Hello"))),
- Left((6500L, (6L, 65, "Hello"))),
- Right(7000L),
- Left((9000L, (6L, 9, "Hello"))),
- Left((9500L, (6L, 18, "Hello"))),
- Left((9000L, (6L, 9, "Hello"))),
- Right(10000L),
- Left((10000L, (7L, 7, "Hello World"))),
- Left((11000L, (7L, 17, "Hello World"))),
- Left((11000L, (7L, 77, "Hello World"))),
- Right(12000L),
- Left((14000L, (7L, 18, "Hello World"))),
- Right(14000L),
- Left((15000L, (8L, 8, "Hello World"))),
- Right(17000L),
- Left((20000L, (20L, 20, "Hello World"))),
- Right(19000L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env
- .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, b, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
- "preceding AND CURRENT ROW)" +
- ", sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
- " preceding AND CURRENT ROW)" +
- " from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
- "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
- "Hello,3,4,9",
- "Hello,4,2,7",
- "Hello,5,2,9",
- "Hello,6,2,11", "Hello,65,2,12",
- "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
- "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
- "Hello World,8,2,15",
- "Hello World,20,1,20")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = {
- val data = Seq(
- Left((1500L, (1L, 15, "Hello"))),
- Left((1600L, (1L, 16, "Hello"))),
- Left((1000L, (1L, 1, "Hello"))),
- Left((2000L, (2L, 2, "Hello"))),
- Right(1000L),
- Left((2000L, (2L, 2, "Hello"))),
- Left((2000L, (2L, 3, "Hello"))),
- Left((3000L, (3L, 3, "Hello"))),
- Right(2000L),
- Left((4000L, (4L, 4, "Hello"))),
- Right(3000L),
- Left((5000L, (5L, 5, "Hello"))),
- Right(5000L),
- Left((6000L, (6L, 6, "Hello"))),
- Left((6500L, (6L, 65, "Hello"))),
- Right(7000L),
- Left((9000L, (6L, 9, "Hello"))),
- Left((9500L, (6L, 18, "Hello"))),
- Left((9000L, (6L, 9, "Hello"))),
- Right(10000L),
- Left((10000L, (7L, 7, "Hello World"))),
- Left((11000L, (7L, 17, "Hello World"))),
- Left((11000L, (7L, 77, "Hello World"))),
- Right(12000L),
- Left((14000L, (7L, 18, "Hello World"))),
- Right(14000L),
- Left((15000L, (8L, 8, "Hello World"))),
- Right(17000L),
- Left((20000L, (20L, 20, "Hello World"))),
- Right(19000L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env
- .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, b, " +
- "count(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
- "preceding AND CURRENT ROW)" +
- ", sum(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
- " preceding AND CURRENT ROW)" +
- " from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
- "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
- "Hello,3,4,9",
- "Hello,4,2,7",
- "Hello,5,2,9",
- "Hello,6,2,11", "Hello,65,2,12",
- "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
- "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
- "Hello World,8,2,15",
- "Hello World,20,1,20")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /**
- * All aggregates must be computed on the same window.
- */
- @Test(expected = classOf[TableException])
- def testMultiWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
- }
-
- /** test sliding event-time unbounded window with partition by **/
- @Test
- def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
- env.setParallelism(1)
-
- val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "count(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "avg(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "max(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "min(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row) " +
- "from T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (3, 1L, "Hello")),
- Left(14000003L, (1, 2L, "Hello")),
- Left(14000004L, (1, 3L, "Hello world")),
- Left(14000007L, (3, 2L, "Hello world")),
- Left(14000008L, (2, 2L, "Hello world")),
- Right(14000010L),
- // the next 3 elements are late
- Left(14000008L, (1, 4L, "Hello world")),
- Left(14000008L, (2, 3L, "Hello world")),
- Left(14000008L, (3, 3L, "Hello world")),
- Left(14000012L, (1, 5L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (1, 6L, "Hello world")),
- // the next 3 elements are late
- Left(14000019L, (1, 6L, "Hello world")),
- Left(14000018L, (2, 4L, "Hello world")),
- Left(14000018L, (3, 4L, "Hello world")),
- Left(14000022L, (2, 5L, "Hello world")),
- Left(14000022L, (3, 5L, "Hello world")),
- Left(14000024L, (1, 7L, "Hello world")),
- Left(14000023L, (1, 8L, "Hello world")),
- Left(14000021L, (1, 9L, "Hello world")),
- Right(14000030L)
- )
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "1,2,Hello,2,1,2,2,2",
- "1,3,Hello world,5,2,2,3,2",
- "1,1,Hi,6,3,2,3,1",
- "2,1,Hello,1,1,1,1,1",
- "2,2,Hello world,3,2,1,2,1",
- "3,1,Hello,1,1,1,1,1",
- "3,2,Hello world,3,2,1,2,1",
- "1,5,Hello world,11,4,2,5,1",
- "1,6,Hello world,17,5,3,6,1",
- "1,9,Hello world,26,6,4,9,1",
- "1,8,Hello world,34,7,4,9,1",
- "1,7,Hello world,41,8,5,9,1",
- "2,5,Hello world,8,3,2,5,1",
- "3,5,Hello world,8,3,2,5,1"
- )
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test sliding event-time unbounded window with partition by **/
- @Test
- def testUnboundedEventTimeRowWindowWithPartitionMultiThread(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
-
- val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "count(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "avg(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "max(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "min(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row) " +
- "from T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (3, 1L, "Hello")),
- Left(14000003L, (1, 2L, "Hello")),
- Left(14000004L, (1, 3L, "Hello world")),
- Left(14000007L, (3, 2L, "Hello world")),
- Left(14000008L, (2, 2L, "Hello world")),
- Right(14000010L),
- Left(14000012L, (1, 5L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (1, 6L, "Hello world")),
- Left(14000023L, (2, 5L, "Hello world")),
- Left(14000024L, (3, 5L, "Hello world")),
- Left(14000026L, (1, 7L, "Hello world")),
- Left(14000025L, (1, 8L, "Hello world")),
- Left(14000022L, (1, 9L, "Hello world")),
- Right(14000030L)
- )
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "1,2,Hello,2,1,2,2,2",
- "1,3,Hello world,5,2,2,3,2",
- "1,1,Hi,6,3,2,3,1",
- "2,1,Hello,1,1,1,1,1",
- "2,2,Hello world,3,2,1,2,1",
- "3,1,Hello,1,1,1,1,1",
- "3,2,Hello world,3,2,1,2,1",
- "1,5,Hello world,11,4,2,5,1",
- "1,6,Hello world,17,5,3,6,1",
- "1,9,Hello world,26,6,4,9,1",
- "1,8,Hello world,34,7,4,9,1",
- "1,7,Hello world,41,8,5,9,1",
- "2,5,Hello world,8,3,2,5,1",
- "3,5,Hello world,8,3,2,5,1"
- )
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test sliding event-time unbounded window without partitiion by **/
- @Test
- def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
- env.setParallelism(1)
-
- val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
- "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
- "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
- "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
- "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
- "from T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 2L, "Hello")),
- Left(14000002L, (3, 5L, "Hello")),
- Left(14000003L, (1, 3L, "Hello")),
- Left(14000004L, (3, 7L, "Hello world")),
- Left(14000007L, (4, 9L, "Hello world")),
- Left(14000008L, (5, 8L, "Hello world")),
- Right(14000010L),
- // this element will be discard because it is late
- Left(14000008L, (6, 8L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (6, 8L, "Hello world")),
- Right(14000030L)
- )
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "2,2,Hello,2,1,2,2,2",
- "3,5,Hello,7,2,3,5,2",
- "1,3,Hello,10,3,3,5,2",
- "3,7,Hello world,17,4,4,7,2",
- "1,1,Hi,18,5,3,7,1",
- "4,9,Hello world,27,6,4,9,1",
- "5,8,Hello world,35,7,5,9,1",
- "6,8,Hello world,43,8,5,9,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test sliding event-time unbounded window without partitiion by and arrive early **/
- @Test
- def testUnboundedEventTimeRowWindowArriveEarly(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
- env.setParallelism(1)
-
- val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
- "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
- "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
- "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
- "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
- "from T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 2L, "Hello")),
- Left(14000002L, (3, 5L, "Hello")),
- Left(14000003L, (1, 3L, "Hello")),
- // next three elements are early
- Left(14000012L, (3, 7L, "Hello world")),
- Left(14000013L, (4, 9L, "Hello world")),
- Left(14000014L, (5, 8L, "Hello world")),
- Right(14000010L),
- Left(14000011L, (6, 8L, "Hello world")),
- // next element is early
- Left(14000021L, (6, 8L, "Hello world")),
- Right(14000020L),
- Right(14000030L)
- )
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "2,2,Hello,2,1,2,2,2",
- "3,5,Hello,7,2,3,5,2",
- "1,3,Hello,10,3,3,5,2",
- "1,1,Hi,11,4,2,5,1",
- "6,8,Hello world,19,5,3,8,1",
- "3,7,Hello world,26,6,4,8,1",
- "4,9,Hello world,35,7,5,9,1",
- "5,8,Hello world,43,8,5,9,1",
- "6,8,Hello world,51,9,5,9,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test sliding event-time non-partitioned unbounded RANGE window **/
- @Test
- def testUnboundedNonPartitionedEventTimeRangeWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
- env.setParallelism(1)
-
- val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (order by rowtime range between unbounded preceding and current row), " +
- "count(b) over (order by rowtime range between unbounded preceding and current row), " +
- "avg(b) over (order by rowtime range between unbounded preceding and current row), " +
- "max(b) over (order by rowtime range between unbounded preceding and current row), " +
- "min(b) over (order by rowtime range between unbounded preceding and current row) " +
- "from T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (1, 1L, "Hello")),
- Left(14000002L, (1, 2L, "Hello")),
- Left(14000002L, (1, 3L, "Hello world")),
- Left(14000003L, (2, 2L, "Hello world")),
- Left(14000003L, (2, 3L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (1, 4L, "Hello world")),
- Left(14000022L, (1, 5L, "Hello world")),
- Left(14000022L, (1, 6L, "Hello world")),
- Left(14000022L, (1, 7L, "Hello world")),
- Left(14000023L, (2, 4L, "Hello world")),
- Left(14000023L, (2, 5L, "Hello world")),
- Right(14000030L)
- )
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "2,1,Hello,1,1,1,1,1",
- "1,1,Hello,7,4,1,3,1",
- "1,2,Hello,7,4,1,3,1",
- "1,3,Hello world,7,4,1,3,1",
- "2,2,Hello world,12,6,2,3,1",
- "2,3,Hello world,12,6,2,3,1",
- "1,1,Hi,13,7,1,3,1",
- "1,4,Hello world,17,8,2,4,1",
- "1,5,Hello world,35,11,3,7,1",
- "1,6,Hello world,35,11,3,7,1",
- "1,7,Hello world,35,11,3,7,1",
- "2,4,Hello world,44,13,3,7,1",
- "2,5,Hello world,44,13,3,7,1"
- )
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test sliding event-time unbounded RANGE window **/
- @Test
- def testUnboundedPartitionedEventTimeRangeWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
- env.setParallelism(1)
-
- val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (" +
- "partition by a order by rowtime range between unbounded preceding and current row), " +
- "count(b) over (" +
- "partition by a order by rowtime range between unbounded preceding and current row), " +
- "avg(b) over (" +
- "partition by a order by rowtime range between unbounded preceding and current row), " +
- "max(b) over (" +
- "partition by a order by rowtime range between unbounded preceding and current row), " +
- "min(b) over (" +
- "partition by a order by rowtime range between unbounded preceding and current row) " +
- "from T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (1, 1L, "Hello")),
- Left(14000002L, (1, 2L, "Hello")),
- Left(14000002L, (1, 3L, "Hello world")),
- Left(14000003L, (2, 2L, "Hello world")),
- Left(14000003L, (2, 3L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (1, 4L, "Hello world")),
- Left(14000022L, (1, 5L, "Hello world")),
- Left(14000022L, (1, 6L, "Hello world")),
- Left(14000022L, (1, 7L, "Hello world")),
- Left(14000023L, (2, 4L, "Hello world")),
- Left(14000023L, (2, 5L, "Hello world")),
- Right(14000030L)
- )
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "1,1,Hello,6,3,2,3,1",
- "1,2,Hello,6,3,2,3,1",
- "1,3,Hello world,6,3,2,3,1",
- "1,1,Hi,7,4,1,3,1",
- "2,1,Hello,1,1,1,1,1",
- "2,2,Hello world,6,3,2,3,1",
- "2,3,Hello world,6,3,2,3,1",
- "1,4,Hello world,11,5,2,4,1",
- "1,5,Hello world,29,8,3,7,1",
- "1,6,Hello world,29,8,3,7,1",
- "1,7,Hello world,29,8,3,7,1",
- "2,4,Hello world,15,5,3,5,1",
- "2,5,Hello world,15,5,3,5,1"
- )
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testPartitionedProcTimeOverWindow(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env)
- .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
- tEnv.registerTable("MyTable", t)
-
- val sqlQuery = "SELECT a, " +
- " SUM(c) OVER (" +
- " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
- " MIN(c) OVER (" +
- " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
- " FROM MyTable"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "1,0,0",
- "2,1,1",
- "2,3,1",
- "3,3,3",
- "3,7,3",
- "3,12,3",
- "4,6,6",
- "4,13,6",
- "4,21,6",
- "4,24,7",
- "5,10,10",
- "5,21,10",
- "5,33,10",
- "5,36,11",
- "5,39,12")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testPartitionedProcTimeOverWindow2(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env)
- .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
- tEnv.registerTable("MyTable", t)
-
- val sqlQuery = "SELECT a, " +
- " SUM(c) OVER (" +
- " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
- " MIN(c) OVER (" +
- " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
- " FROM MyTable"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "1,0,0",
- "2,1,1",
- "2,3,1",
- "3,3,3",
- "3,7,3",
- "3,12,3",
- "4,6,6",
- "4,13,6",
- "4,21,6",
- "4,30,6",
- "5,10,10",
- "5,21,10",
- "5,33,10",
- "5,46,10",
- "5,60,10")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
-
- @Test
- def testNonPartitionedProcTimeOverWindow(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env)
- .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
- tEnv.registerTable("MyTable", t)
-
- val sqlQuery = "SELECT a, " +
- " SUM(c) OVER (" +
- " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
- " MIN(c) OVER (" +
- " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
- " FROM MyTable"
-
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "1,0,0",
- "2,1,0",
- "2,3,0",
- "3,6,1",
- "3,9,2",
- "3,12,3",
- "4,15,4",
- "4,18,5",
- "4,21,6",
- "4,24,7",
- "5,27,8",
- "5,30,9",
- "5,33,10",
- "5,36,11",
- "5,39,12")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testNonPartitionedProcTimeOverWindow2(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env)
- .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
- tEnv.registerTable("MyTable", t)
-
- val sqlQuery = "SELECT a, " +
- " SUM(c) OVER (" +
- " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
- " MIN(c) OVER (" +
- " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
- " FROM MyTable"
- val result = tEnv.sql(sqlQuery).toDataStream[Row]
- result.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = List(
- "1,0,0",
- "2,1,0",
- "2,3,0",
- "3,6,0",
- "3,10,0",
- "3,15,0",
- "4,21,0",
- "4,28,0",
- "4,36,0",
- "4,45,0",
- "5,55,0",
- "5,66,1",
- "5,77,2",
- "5,88,3",
- "5,99,4")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
-}
-
-object SqlITCase {
-
- class EventTimeSourceFunction[T](
- dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
- override def run(ctx: SourceContext[T]): Unit = {
- dataWithTimestampList.foreach {
- case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
- case Right(w) => ctx.emitWatermark(new Watermark(w))
- }
- }
-
- override def cancel(): Unit = ???
- }
}
[2/3] flink git commit: [FLINK-6257] [table] Refactor OVER window
tests.
Posted by fh...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 4c1d6e6..125d071 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -32,64 +32,9 @@ class WindowAggregateTest extends TableTestBase {
"MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
@Test
- def testNonPartitionedProcessingTimeBoundedWindow() = {
-
- val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY proctime " +
- "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
- "FROM MyTable"
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("orderBy", "proctime"),
- term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
- ),
- term("select", "a", "w0$o0 AS $1")
- )
-
- streamUtil.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testPartitionedProcessingTimeBoundedWindow() = {
-
- val sqlQuery =
- "SELECT a, " +
- " AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
- " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
- "FROM MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("partitionBy","a"),
- term("orderBy", "proctime"),
- term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
- ),
- term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
- )
-
- streamUtil.verifySql(sqlQuery, expected)
- }
-
- @Test
def testGroupbyWithoutWindow() = {
val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
+
val expected =
unaryNode(
"DataStreamCalc",
@@ -241,327 +186,4 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sqlQuery, "n/a")
}
-
- @Test
- def testUnboundPartitionedProcessingWindowWithRange() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "proctime"),
- term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testUnboundPartitionedProcessingWindowWithRow() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
- "CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- streamTableNode(0),
- term("partitionBy", "c"),
- term("orderBy", "proctime"),
- term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testUnboundNonPartitionedProcessingWindowWithRange() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("orderBy", "proctime"),
- term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testUnboundNonPartitionedProcessingWindowWithRow() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
- "CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- streamTableNode(0),
- term("orderBy", "proctime"),
- term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testUnboundNonPartitionedEventTimeWindowWithRange() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("orderBy", "rowtime"),
- term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testUnboundPartitionedEventTimeWindowWithRange() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "rowtime"),
- term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
- ),
- term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testBoundPartitionedRowTimeWindowWithRow() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
- "CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "rowtime"),
- term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testBoundNonPartitionedRowTimeWindowWithRow() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
- "CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("orderBy", "rowtime"),
- term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testBoundPartitionedRowTimeWindowWithRange() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
- "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "rowtime"),
- term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testBoundNonPartitionedRowTimeWindowWithRange() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY rowtime " +
- "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "rowtime")
- ),
- term("orderBy", "rowtime"),
- term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testBoundNonPartitionedProcTimeWindowWithRowRange() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
- "CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("orderBy", "proctime"),
- term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
- @Test
- def testBoundPartitionedProcTimeWindowWithRowRange() = {
- val sql = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
- "CURRENT ROW) as cnt1 " +
- "from MyTable"
-
- val expected =
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "proctime")
- ),
- term("partitionBy", "c"),
- term("orderBy", "proctime"),
- term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
- ),
- term("select", "c", "w0$o0 AS $1")
- )
- streamUtil.verifySql(sql, expected)
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
new file mode 100644
index 0000000..eb5acd5b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.util.{Comparator, Queue => JQueue}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, TestHarnessUtil}
+import org.apache.flink.table.runtime.types.CRow
+
+class HarnessTestBase {
+ def createHarnessTester[IN, OUT, KEY](
+ operator: OneInputStreamOperator[IN, OUT],
+ keySelector: KeySelector[IN, KEY],
+ keyType: TypeInformation[KEY]): KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+ new KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT](operator, keySelector, keyType)
+ }
+
+ def verify(
+ expected: JQueue[Object],
+ actual: JQueue[Object],
+ comparator: Comparator[Object],
+ checkWaterMark: Boolean = false): Unit = {
+ if (!checkWaterMark) {
+ val it = actual.iterator()
+ while (it.hasNext) {
+ val data = it.next()
+ if (data.isInstanceOf[Watermark]) {
+ actual.remove(data)
+ }
+ }
+ }
+ TestHarnessUtil.assertOutputEqualsSorted("Verify Error...", expected, actual, comparator)
+ }
+}
+
+object HarnessTestBase {
+
+ /**
+ * Return 0 for equal Rows and non zero for different rows
+ */
+ class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable {
+
+ override def compare(o1: Object, o2: Object): Int = {
+
+ if (o1.isInstanceOf[Watermark] || o2.isInstanceOf[Watermark]) {
+ // watermark is not expected
+ -1
+ } else {
+ val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue
+ val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue
+ row1.toString.compareTo(row2.toString)
+ }
+ }
+ }
+
+ /**
+ * Tuple row key selector that returns a specified field as the selector function
+ */
+ class TupleRowKeySelector[T](
+ private val selectorField: Int) extends KeySelector[CRow, T] {
+
+ override def getKey(value: CRow): T = {
+ value.row.getField(selectorField).asInstanceOf[T]
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9f2293cf/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
new file mode 100644
index 0000000..56ca85c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -0,0 +1,974 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.harness
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+import org.apache.flink.table.codegen.GeneratedAggregationsFunction
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction}
+import org.apache.flink.table.runtime.aggregate._
+import org.apache.flink.table.runtime.harness.HarnessTestBase._
+import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class OverWindowHarnessTest extends HarnessTestBase{
+
+ private val rT = new RowTypeInfo(Array[TypeInformation[_]](
+ INT_TYPE_INFO,
+ LONG_TYPE_INFO,
+ INT_TYPE_INFO,
+ STRING_TYPE_INFO,
+ LONG_TYPE_INFO),
+ Array("a", "b", "c", "d", "e"))
+
+ private val cRT = new CRowTypeInfo(rT)
+
+ private val aggregates =
+ Array(new LongMinWithRetractAggFunction,
+ new LongMaxWithRetractAggFunction).asInstanceOf[Array[AggregateFunction[_, _]]]
+ private val aggregationStateType: RowTypeInfo = AggregateUtil.createAccumulatorRowType(aggregates)
+
+ val funcCode: String =
+ """
+ |public class BoundedOverAggregateHelper
+ | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
+ |
+ | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
+ | fmin = null;
+ |
+ | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
+ | fmax = null;
+ |
+ | public BoundedOverAggregateHelper() throws Exception {
+ |
+ | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
+ | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+ | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
+ | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
+ | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbq_ZGuzxtA_S" +
+ | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
+ | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
+ | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
+ | "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+ |
+ | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
+ | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+ | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" +
+ | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" +
+ | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbvnwowlX0_Qf" +
+ | "AgABTAADb3JkdAAVTHNjYWxhL21hdGgvT3JkZXJpbmc7eHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnV" +
+ | "uY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uTcYVPtJjNfwCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS" +
+ | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" +
+ | "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
+ | }
+ |
+ | public void setAggregationResults(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row output) {
+ |
+ | org.apache.flink.table.functions.AggregateFunction baseClass0 =
+ | (org.apache.flink.table.functions.AggregateFunction) fmin;
+ | output.setField(5, baseClass0.getValue(
+ | (org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+ | accs.getField(0)));
+ |
+ | org.apache.flink.table.functions.AggregateFunction baseClass1 =
+ | (org.apache.flink.table.functions.AggregateFunction) fmax;
+ | output.setField(6, baseClass1.getValue(
+ | (org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+ | accs.getField(1)));
+ | }
+ |
+ | public void accumulate(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row input) {
+ |
+ | fmin.accumulate(
+ | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+ | accs.getField(0)),
+ | (java.lang.Long) input.getField(4));
+ |
+ | fmax.accumulate(
+ | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+ | accs.getField(1)),
+ | (java.lang.Long) input.getField(4));
+ | }
+ |
+ | public void retract(
+ | org.apache.flink.types.Row accs,
+ | org.apache.flink.types.Row input) {
+ |
+ | fmin.retract(
+ | ((org.apache.flink.table.functions.aggfunctions.MinWithRetractAccumulator)
+ | accs.getField(0)),
+ | (java.lang.Long) input.getField(4));
+ |
+ | fmax.retract(
+ | ((org.apache.flink.table.functions.aggfunctions.MaxWithRetractAccumulator)
+ | accs.getField(1)),
+ | (java.lang.Long) input.getField(4));
+ | }
+ |
+ | public org.apache.flink.types.Row createAccumulators() {
+ |
+ | org.apache.flink.types.Row accs = new org.apache.flink.types.Row(2);
+ |
+ | accs.setField(
+ | 0,
+ | fmin.createAccumulator());
+ |
+ | accs.setField(
+ | 1,
+ | fmax.createAccumulator());
+ |
+ | return accs;
+ | }
+ |
+ | public void setForwardedFields(
+ | org.apache.flink.types.Row input,
+ | org.apache.flink.types.Row output) {
+ |
+ | output.setField(0, input.getField(0));
+ | output.setField(1, input.getField(1));
+ | output.setField(2, input.getField(2));
+ | output.setField(3, input.getField(3));
+ | output.setField(4, input.getField(4));
+ | }
+ |
+ | public org.apache.flink.types.Row createOutputRow() {
+ | return new org.apache.flink.types.Row(7);
+ | }
+ |
+ |/******* This test does not use the following methods *******/
+ | public org.apache.flink.types.Row mergeAccumulatorsPair(
+ | org.apache.flink.types.Row a,
+ | org.apache.flink.types.Row b) {
+ | return null;
+ | }
+ |
+ | public void resetAccumulator(org.apache.flink.types.Row accs) {
+ | }
+ |
+ | public void setConstantFlags(org.apache.flink.types.Row output) {
+ | }
+ |}
+ """.stripMargin
+
+
+ private val funcName = "BoundedOverAggregateHelper"
+
+ private val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode)
+
+
+ @Test
+ def testProcTimeBoundedRowsOver(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new ProcTimeBoundedRowsOver(
+ genAggFunction,
+ 2,
+ aggregationStateType,
+ cRT))
+
+ val testHarness =
+ createHarnessTester(processFunction,new TupleRowKeySelector[Integer](0),BasicTypeInfo
+ .INT_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(1)
+
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 1))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 1))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 1))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 1))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 1))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 1))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 1))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 1))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 1))
+
+ testHarness.setProcessingTime(2)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 2))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 2))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 2))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 2))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 2))
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true), 1))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true), 2))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true), 2))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true), 2))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true), 2))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 2))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+
+ testHarness.close()
+ }
+
+ /**
+ * NOTE: all elements at the same proc timestamp have the same value per key
+ */
+ @Test
+ def testProcTimeBoundedRangeOver(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new ProcTimeBoundedRangeOver(
+ genAggFunction,
+ 1000,
+ aggregationStateType,
+ cRT))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[Integer](0),
+ BasicTypeInfo.INT_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.setProcessingTime(3)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
+
+ testHarness.setProcessingTime(4)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
+
+ testHarness.setProcessingTime(5)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
+
+ testHarness.setProcessingTime(6)
+
+ testHarness.setProcessingTime(1002)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
+
+ testHarness.setProcessingTime(1003)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0))
+
+ testHarness.setProcessingTime(1004)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0))
+
+ testHarness.setProcessingTime(1005)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0))
+
+ testHarness.setProcessingTime(1006)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same proc timestamp have the same value per key
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+
+ testHarness.close()
+ }
+
+ @Test
+ def testProcTimeUnboundedOver(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new ProcTimeUnboundedPartitionedOver(
+ genAggFunction,
+ aggregationStateType))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[Integer](0),
+ BasicTypeInfo.INT_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0))
+
+ testHarness.setProcessingTime(1003)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 1003))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 1003))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 1003))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 1003))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 1003))
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 0))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 1003))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 1003))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+ testHarness.close()
+ }
+
+ /**
+ * all elements at the same row-time have the same value per key
+ */
+ @Test
+ def testRowTimeBoundedRangeOver(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new RowTimeBoundedRangeOver(
+ genAggFunction,
+ aggregationStateType,
+ cRT,
+ 4000))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[String](3),
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.processWatermark(1)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 2))
+
+ testHarness.processWatermark(2)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 3))
+
+ testHarness.processWatermark(4000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+
+ testHarness.processWatermark(4001)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4002))
+
+ testHarness.processWatermark(4002)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong), true), 4003))
+
+ testHarness.processWatermark(4800)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong), true), 4801))
+
+ testHarness.processWatermark(6500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+ testHarness.processWatermark(7000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+ testHarness.processWatermark(8000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+ testHarness.processWatermark(12000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+ testHarness.processWatermark(19000)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same row-time have the same value per key
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 2))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 3))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4002))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 0L: JLong, 0: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4003))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 11L: JLong, 1: JInt, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), true), 4801))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true), 7001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 8001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true), 12001))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+ testHarness.close()
+ }
+
+ @Test
+ def testRowTimeBoundedRowsOver(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new RowTimeBoundedRowsOver(
+ genAggFunction,
+ aggregationStateType,
+ cRT,
+ 3))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[String](3),
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.processWatermark(800)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+ testHarness.processWatermark(2500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+ testHarness.processWatermark(4000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+ testHarness.processWatermark(4800)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+ testHarness.processWatermark(6500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+ testHarness.processWatermark(7000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+ testHarness.processWatermark(8000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+ testHarness.processWatermark(12000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+ testHarness.processWatermark(19000)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true), 4801))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true), 7001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true), 8001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true), 12001))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+ testHarness.close()
+ }
+
+ /**
+ * all elements at the same row-time have the same value per key
+ */
+ @Test
+ def testRowTimeUnboundedRangeOver(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new RowTimeUnboundedRangeOver(
+ genAggFunction,
+ aggregationStateType,
+ cRT))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[String](3),
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.processWatermark(800)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+ testHarness.processWatermark(2500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+ testHarness.processWatermark(4000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+ testHarness.processWatermark(4800)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+ testHarness.processWatermark(6500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+ testHarness.processWatermark(7000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+ testHarness.processWatermark(8000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+ testHarness.processWatermark(12000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+ testHarness.processWatermark(19000)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ // all elements at the same row-time have the same value per key
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+ testHarness.close()
+ }
+
+ @Test
+ def testRowTimeUnboundedRowsOver(): Unit = {
+
+ val processFunction = new KeyedProcessOperator[String, CRow, CRow](
+ new RowTimeUnboundedRowsOver(
+ genAggFunction,
+ aggregationStateType,
+ cRT))
+
+ val testHarness =
+ createHarnessTester(
+ processFunction,
+ new TupleRowKeySelector[String](3),
+ BasicTypeInfo.STRING_TYPE_INFO)
+
+ testHarness.open()
+
+ testHarness.processWatermark(800)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 801))
+
+ testHarness.processWatermark(2500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 2501))
+
+ testHarness.processWatermark(4000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 4001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 4001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 4001))
+
+ testHarness.processWatermark(4800)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 4801))
+
+ testHarness.processWatermark(6500)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 6501))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 6501))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 6501))
+
+ testHarness.processWatermark(7000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 7001))
+
+ testHarness.processWatermark(8000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 8001))
+
+ testHarness.processWatermark(12000)
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 12001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 12001))
+ testHarness.processElement(new StreamRecord(
+ CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 12001))
+
+ testHarness.processWatermark(19000)
+
+ val result = testHarness.getOutput
+
+ val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 801))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 2501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 4001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 4801))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 6501))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 7001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true), 8001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true), 12001))
+ expectedOutput.add(new StreamRecord(
+ CRow(
+ Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true), 12001))
+
+ verify(expectedOutput, result, new RowResultSortComparator(6))
+ testHarness.close()
+ }
+}