You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:36 UTC
[02/15] flink git commit: [FLINK-5884] [table] Integrate time
indicators for Table API & SQL
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
index 63dc1ae..31ad558 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable}
import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.utils.{AggSqlFunction, ScalarSqlFunction, TableSqlFunction}
-import org.apache.flink.table.functions.{AggregateFunction, EventTimeExtractor, RowTime, ScalarFunction, TableFunction, _}
+import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import scala.collection.JavaConversions._
import scala.collection.mutable
@@ -242,15 +242,11 @@ object FunctionCatalog {
// array
"cardinality" -> classOf[ArrayCardinality],
"at" -> classOf[ArrayElementAt],
- "element" -> classOf[ArrayElement],
+ "element" -> classOf[ArrayElement]
// TODO implement function overloading here
// "floor" -> classOf[TemporalFloor]
// "ceil" -> classOf[TemporalCeil]
-
- // extensions to support streaming query
- "rowtime" -> classOf[RowTime],
- "proctime" -> classOf[ProcTime]
)
/**
@@ -392,8 +388,6 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.ROUND,
SqlStdOperatorTable.PI,
// EXTENSIONS
- EventTimeExtractor,
- ProcTimeExtractor,
SqlStdOperatorTable.TUMBLE,
SqlStdOperatorTable.TUMBLE_START,
SqlStdOperatorTable.TUMBLE_END,
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index cab3855..81c60b4 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -406,15 +406,6 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
}
@Test(expected = TableException.class)
- public void testAsWithToFewFields() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
- // Must fail. Not enough field names specified.
- tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
- }
-
- @Test(expected = TableException.class)
public void testAsWithToManyFields() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 9939a9c..faacc54 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -93,7 +93,8 @@ class TableEnvironmentTest extends TableTestBase {
UnresolvedFieldReference("name1"),
UnresolvedFieldReference("name2"),
UnresolvedFieldReference("name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -107,7 +108,8 @@ class TableEnvironmentTest extends TableTestBase {
UnresolvedFieldReference("name1"),
UnresolvedFieldReference("name2"),
UnresolvedFieldReference("name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -121,7 +123,8 @@ class TableEnvironmentTest extends TableTestBase {
UnresolvedFieldReference("name1"),
UnresolvedFieldReference("name2"),
UnresolvedFieldReference("name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
}
@Test
@@ -132,7 +135,8 @@ class TableEnvironmentTest extends TableTestBase {
UnresolvedFieldReference("pf3"),
UnresolvedFieldReference("pf1"),
UnresolvedFieldReference("pf2")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -142,7 +146,8 @@ class TableEnvironmentTest extends TableTestBase {
def testGetFieldInfoAtomicName1(): Unit = {
val fieldInfo = tEnv.getFieldInfo(
atomicType,
- Array(UnresolvedFieldReference("name"))
+ Array(UnresolvedFieldReference("name")),
+ ignoreTimeAttributes = true
)
fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
@@ -156,7 +161,8 @@ class TableEnvironmentTest extends TableTestBase {
Array(
UnresolvedFieldReference("name1"),
UnresolvedFieldReference("name2")
- ))
+ ),
+ ignoreTimeAttributes = true)
}
@Test
@@ -167,7 +173,8 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("f0"), "name1"),
Alias(UnresolvedFieldReference("f1"), "name2"),
Alias(UnresolvedFieldReference("f2"), "name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -181,7 +188,8 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("f2"), "name1"),
Alias(UnresolvedFieldReference("f0"), "name2"),
Alias(UnresolvedFieldReference("f1"), "name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -195,7 +203,8 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("xxx"), "name1"),
Alias(UnresolvedFieldReference("yyy"), "name2"),
Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
}
@Test
@@ -206,7 +215,8 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("cf1"), "name1"),
Alias(UnresolvedFieldReference("cf2"), "name2"),
Alias(UnresolvedFieldReference("cf3"), "name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -220,7 +230,8 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("cf3"), "name1"),
Alias(UnresolvedFieldReference("cf1"), "name2"),
Alias(UnresolvedFieldReference("cf2"), "name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -234,7 +245,8 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("xxx"), "name1"),
Alias(UnresolvedFieldReference("yyy"), "name2"),
Alias(UnresolvedFieldReference("zzz"), "name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
}
@Test
@@ -245,7 +257,8 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("pf1"), "name1"),
Alias(UnresolvedFieldReference("pf2"), "name2"),
Alias(UnresolvedFieldReference("pf3"), "name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -259,7 +272,8 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("pf3"), "name1"),
Alias(UnresolvedFieldReference("pf1"), "name2"),
Alias(UnresolvedFieldReference("pf2"), "name3")
- ))
+ ),
+ ignoreTimeAttributes = true)
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -272,8 +286,9 @@ class TableEnvironmentTest extends TableTestBase {
Array(
Alias(UnresolvedFieldReference("xxx"), "name1"),
Alias(UnresolvedFieldReference("yyy"), "name2"),
- Alias( UnresolvedFieldReference("zzz"), "name3")
- ))
+ Alias(UnresolvedFieldReference("zzz"), "name3")
+ ),
+ ignoreTimeAttributes = true)
}
@Test(expected = classOf[TableException])
@@ -282,12 +297,16 @@ class TableEnvironmentTest extends TableTestBase {
atomicType,
Array(
Alias(UnresolvedFieldReference("name1"), "name2")
- ))
+ ),
+ ignoreTimeAttributes = true)
}
@Test(expected = classOf[TableException])
def testGetFieldInfoGenericRowAlias(): Unit = {
- tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first")))
+ tEnv.getFieldInfo(
+ genericRowType,
+ Array(UnresolvedFieldReference("first")),
+ ignoreTimeAttributes = true)
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index e61e190..57ee3b3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -208,16 +208,6 @@ class TableEnvironmentITCase(
}
@Test(expected = classOf[TableException])
- def testToTableWithToFewFields(): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
- CollectionDataSets.get3TupleDataSet(env)
- // Must fail. Number of fields does not match.
- .toTable(tEnv, 'a, 'b)
- }
-
- @Test(expected = classOf[TableException])
def testToTableWithToManyFields(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
index 0ccb557..71d0002 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -47,7 +47,7 @@ class WindowAggregateTest extends TableTestBase {
batchTableNode(0),
term("select", "ts, a, b")
),
- term("window", EventTimeTumblingGroupWindow('w$, 'ts, 7200000.millis)),
+ term("window", TumblingGroupWindow('w$, 'ts, 7200000.millis)),
term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
)
@@ -76,7 +76,7 @@ class WindowAggregateTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "c"),
- term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)),
+ term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " +
"start('w$) AS w$start, end('w$) AS w$end")
),
@@ -106,7 +106,7 @@ class WindowAggregateTest extends TableTestBase {
batchTableNode(0),
term("select", "ts, b, a")
),
- term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)),
+ term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
term("select", "weightedAvg(b, a) AS wAvg")
)
@@ -132,7 +132,7 @@ class WindowAggregateTest extends TableTestBase {
term("select", "ts, a, b")
),
term("window",
- EventTimeSlidingGroupWindow('w$, 'ts, 5400000.millis, 900000.millis)),
+ SlidingGroupWindow('w$, 'ts, 5400000.millis, 900000.millis)),
term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
)
@@ -162,7 +162,7 @@ class WindowAggregateTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "c, d"),
term("window",
- EventTimeSlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)),
+ SlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)),
term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " +
"start('w$) AS w$start, end('w$) AS w$end")
),
@@ -188,7 +188,7 @@ class WindowAggregateTest extends TableTestBase {
batchTableNode(0),
term("select", "ts")
),
- term("window", EventTimeSessionGroupWindow('w$, 'ts, 1800000.millis)),
+ term("window", SessionGroupWindow('w$, 'ts, 1800000.millis)),
term("select", "COUNT(*) AS cnt")
)
@@ -217,7 +217,7 @@ class WindowAggregateTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "c, d"),
- term("window", EventTimeSessionGroupWindow('w$, 'ts, 43200000.millis)),
+ term("window", SessionGroupWindow('w$, 'ts, 43200000.millis)),
term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " +
"start('w$) AS w$start, end('w$) AS w$end")
),
@@ -249,7 +249,7 @@ class WindowAggregateTest extends TableTestBase {
term("select", "ts, c")
),
term("groupBy", "c"),
- term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)),
+ term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)),
term("select", "c, start('w$) AS w$start, end('w$) AS w$end")
),
term("select", "CAST(w$end) AS w$end")
@@ -304,7 +304,7 @@ class WindowAggregateTest extends TableTestBase {
val sql = "SELECT COUNT(*) " +
"FROM T " +
- "GROUP BY TUMBLE(proctime(), b * INTERVAL '1' MINUTE)"
+ "GROUP BY TUMBLE(ts, b * INTERVAL '1' MINUTE)"
util.verifySql(sql, "n/a")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 6ebfec0..b484293 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -19,14 +19,12 @@ package org.apache.flink.table.api.scala.batch.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.expressions.{RowtimeAttribute, Upper, WindowReference}
-import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.api.scala.batch.table.FieldProjectionTest._
-import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow
-import org.apache.flink.table.utils._
-import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.expressions.{Upper, WindowReference}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{TableTestBase, _}
import org.junit.Test
/**
@@ -223,7 +221,8 @@ class FieldProjectionTest extends TableTestBase {
@Test
def testSelectFromStreamingWindow(): Unit = {
- val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val sourceTable = streamUtil
+ .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
val resultTable = sourceTable
.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w)
@@ -235,14 +234,14 @@ class FieldProjectionTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "c", "a", "UPPER(c) AS $f2")
+ term("select", "c", "a", "rowtime", "UPPER(c) AS $f3")
),
term("window",
- EventTimeTumblingGroupWindow(
- WindowReference("w"),
- RowtimeAttribute(),
+ TumblingGroupWindow(
+ WindowReference("w"),
+ 'rowtime,
5.millis)),
- term("select", "COUNT($f2) AS TMP_0", "SUM(a) AS TMP_1")
+ term("select", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
)
streamUtil.verifyTable(resultTable, expected)
@@ -250,7 +249,8 @@ class FieldProjectionTest extends TableTestBase {
@Test
def testSelectFromStreamingGroupedWindow(): Unit = {
- val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
+ val sourceTable = streamUtil
+ .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
val resultTable = sourceTable
.window(Tumble over 5.millis on 'rowtime as 'w)
.groupBy('w, 'b)
@@ -263,15 +263,15 @@ class FieldProjectionTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "c", "a", "b", "UPPER(c) AS $f3")
+ term("select", "c", "a", "b", "rowtime", "UPPER(c) AS $f4")
),
term("groupBy", "b"),
term("window",
- EventTimeTumblingGroupWindow(
- WindowReference("w"),
- RowtimeAttribute(),
+ TumblingGroupWindow(
+ WindowReference("w"),
+ 'rowtime,
5.millis)),
- term("select", "b", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
+ term("select", "b", "COUNT($f4) AS TMP_0", "SUM(a) AS TMP_1")
),
term("select", "TMP_0", "TMP_1", "b")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
index 8a20f6d..c481105 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
@@ -63,25 +63,24 @@ class GroupWindowTest extends TableTestBase {
//===============================================================================================
@Test(expected = classOf[ValidationException])
- def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+ def testInvalidProcessingTimeDefinition(): Unit = {
val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .window(Tumble over 50.milli as 'w) // require a time attribute
- .groupBy('w, 'string)
- .select('string, 'int.count)
+ // proctime is not allowed
+ util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
}
@Test(expected = classOf[ValidationException])
- def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+ def testInvalidProcessingTimeDefinition2(): Unit = {
val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+ // proctime is not allowed
+ util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+ }
- table
- .window(Tumble over 2.rows as 'w) // require a time attribute
- .groupBy('w, 'string)
- .select('string, 'int.count)
+ @Test(expected = classOf[ValidationException])
+ def testInvalidEventTimeDefinition(): Unit = {
+ val util = batchTestUtil()
+ // definition must not extend schema
+ util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
}
@Test(expected = classOf[ValidationException])
@@ -101,7 +100,7 @@ class GroupWindowTest extends TableTestBase {
@Test
def testEventTimeTumblingGroupWindowOverCount(): Unit = {
val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
val windowedTable = table
.window(Tumble over 2.rows on 'long as 'w)
@@ -112,7 +111,7 @@ class GroupWindowTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "string"),
- term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -135,7 +134,7 @@ class GroupWindowTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "string"),
- term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
)
@@ -145,7 +144,7 @@ class GroupWindowTest extends TableTestBase {
@Test
def testEventTimeTumblingGroupWindowOverTime(): Unit = {
val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
val windowedTable = table
.window(Tumble over 5.milli on 'long as 'w)
@@ -156,7 +155,7 @@ class GroupWindowTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "string"),
- term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -164,28 +163,6 @@ class GroupWindowTest extends TableTestBase {
}
@Test(expected = classOf[ValidationException])
- def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .window(Tumble over 50.milli as 'w) // require a time attribute
- .groupBy('w)
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .window(Tumble over 2.rows as 'w) // require a time attribute
- .groupBy('w)
- .select('int.count)
- }
-
- @Test(expected = classOf[ValidationException])
def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
val util = batchTestUtil()
val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -216,7 +193,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("select", "int", "long")
),
- term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)),
term("select", "COUNT(int) AS TMP_0")
)
@@ -240,7 +217,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("select", "int", "long")
),
- term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
+ term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)),
term("select", "COUNT(int) AS TMP_0")
)
@@ -252,28 +229,6 @@ class GroupWindowTest extends TableTestBase {
//===============================================================================================
@Test(expected = classOf[ValidationException])
- def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .window(Slide over 50.milli every 50.milli as 'w) // require on a time attribute
- .groupBy('w, 'string)
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
- def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .window(Slide over 10.rows every 5.rows as 'w) // require on a time attribute
- .groupBy('w, 'string)
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
val util = batchTestUtil()
val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -302,7 +257,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "string"),
term("window",
- EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+ SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -324,7 +279,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "string"),
term("window",
- EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
+ SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -348,7 +303,7 @@ class GroupWindowTest extends TableTestBase {
batchTableNode(0),
term("groupBy", "string"),
term("window",
- EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+ SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
)
@@ -356,17 +311,6 @@ class GroupWindowTest extends TableTestBase {
}
@Test(expected = classOf[ValidationException])
- def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- table
- .window(Slide over 2.rows every 1.rows as 'w) // require on a time attribute
- .groupBy('w)
- .select('int.count)
- }
-
- @Test(expected = classOf[ValidationException])
def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
val util = batchTestUtil()
val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
@@ -374,7 +318,7 @@ class GroupWindowTest extends TableTestBase {
val myWeightedAvg = new WeightedAvgWithMerge
table
- .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w)
+ .window(Slide over 2.minutes every 1.minute on 'long as 'w)
.groupBy('w)
// invalid function arguments
.select(myWeightedAvg('int, 'string))
@@ -398,7 +342,7 @@ class GroupWindowTest extends TableTestBase {
term("select", "int", "long")
),
term("window",
- EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
+ SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)),
term("select", "COUNT(int) AS TMP_0")
)
@@ -423,7 +367,7 @@ class GroupWindowTest extends TableTestBase {
term("select", "int", "long")
),
term("window",
- EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
+ SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)),
term("select", "COUNT(int) AS TMP_0")
)
@@ -448,7 +392,7 @@ class GroupWindowTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "string"),
- term("window", EventTimeSessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+ term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
term("select", "string", "COUNT(int) AS TMP_0")
)
@@ -471,7 +415,7 @@ class GroupWindowTest extends TableTestBase {
"DataSetWindowAggregate",
batchTableNode(0),
term("groupBy", "string"),
- term("window", EventTimeSessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
+ term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)),
term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
)
@@ -479,17 +423,6 @@ class GroupWindowTest extends TableTestBase {
}
@Test(expected = classOf[ValidationException])
- def testProcessingTimeSessionGroupWindowOverTime(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
- val windowedTable = table
- .window(Session withGap 7.milli as 'w) // require on a time attribute
- .groupBy('string, 'w)
- .select('string, 'int.count)
- }
-
- @Test(expected = classOf[ValidationException])
def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
val util = batchTestUtil()
val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 67d13b0..6bab4b3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -197,14 +197,14 @@ class SqlITCase extends StreamingWithStateTestBase {
// for sum aggregation ensure that every time the order of each element is consistent
env.setParallelism(1)
- val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from T1"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -224,13 +224,13 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
"CURRENT ROW)" +
"from T1"
@@ -254,14 +254,14 @@ class SqlITCase extends StreamingWithStateTestBase {
// for sum aggregation ensure that every time the order of each element is consistent
env.setParallelism(1)
- val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
"c, " +
- "count(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from T1"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -281,12 +281,12 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
- "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
"from T1"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -328,14 +328,14 @@ class SqlITCase extends StreamingWithStateTestBase {
val t1 = env
.addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
"c, a, " +
- "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
- ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+ ", sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
" from T1"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -385,14 +385,14 @@ class SqlITCase extends StreamingWithStateTestBase {
val t1 = env
.addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
"c, a, " +
- "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
- "sum(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
+ "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)," +
+ "sum(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
"from T1"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -453,15 +453,15 @@ class SqlITCase extends StreamingWithStateTestBase {
val t1 = env
.addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
"c, b, " +
- "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
"preceding AND CURRENT ROW)" +
- ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+ ", sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
" preceding AND CURRENT ROW)" +
" from T1"
@@ -525,15 +525,15 @@ class SqlITCase extends StreamingWithStateTestBase {
val t1 = env
.addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
"c, b, " +
- "count(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+ "count(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
"preceding AND CURRENT ROW)" +
- ", sum(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " +
+ ", sum(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " +
" preceding AND CURRENT ROW)" +
" from T1"
@@ -565,14 +565,14 @@ class SqlITCase extends StreamingWithStateTestBase {
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.testResults = mutable.MutableList()
- val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
tEnv.registerTable("T1", t1)
val sqlQuery = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY b ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from T1"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -592,15 +592,15 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, c, " +
"SUM(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"count(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"avg(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"max(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"min(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row) " +
+ "partition by a order by rowtime rows between unbounded preceding and current row) " +
"from T1"
val data = Seq(
@@ -632,7 +632,7 @@ class SqlITCase extends StreamingWithStateTestBase {
)
val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
@@ -670,15 +670,15 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, c, " +
"SUM(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"count(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"avg(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"max(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row), " +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
"min(b) over (" +
- "partition by a order by rowtime() rows between unbounded preceding and current row) " +
+ "partition by a order by rowtime rows between unbounded preceding and current row) " +
"from T1"
val data = Seq(
@@ -702,7 +702,7 @@ class SqlITCase extends StreamingWithStateTestBase {
)
val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
@@ -740,11 +740,11 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " +
- "count(b) over (order by rowtime() rows between unbounded preceding and current row), " +
- "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " +
- "max(b) over (order by rowtime() rows between unbounded preceding and current row), " +
- "min(b) over (order by rowtime() rows between unbounded preceding and current row) " +
+ "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
+ "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
+ "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
+ "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
+ "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
"from T1"
val data = Seq(
@@ -764,7 +764,7 @@ class SqlITCase extends StreamingWithStateTestBase {
)
val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
@@ -795,11 +795,11 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " +
- "count(b) over (order by rowtime() rows between unbounded preceding and current row), " +
- "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " +
- "max(b) over (order by rowtime() rows between unbounded preceding and current row), " +
- "min(b) over (order by rowtime() rows between unbounded preceding and current row) " +
+ "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " +
+ "count(b) over (order by rowtime rows between unbounded preceding and current row), " +
+ "avg(b) over (order by rowtime rows between unbounded preceding and current row), " +
+ "max(b) over (order by rowtime rows between unbounded preceding and current row), " +
+ "min(b) over (order by rowtime rows between unbounded preceding and current row) " +
"from T1"
val data = Seq(
@@ -820,7 +820,7 @@ class SqlITCase extends StreamingWithStateTestBase {
)
val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
@@ -852,11 +852,11 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1)
val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " +
- "count(b) over (order by rowtime() range between unbounded preceding and current row), " +
- "avg(b) over (order by rowtime() range between unbounded preceding and current row), " +
- "max(b) over (order by rowtime() range between unbounded preceding and current row), " +
- "min(b) over (order by rowtime() range between unbounded preceding and current row) " +
+ "SUM(b) over (order by rowtime range between unbounded preceding and current row), " +
+ "count(b) over (order by rowtime range between unbounded preceding and current row), " +
+ "avg(b) over (order by rowtime range between unbounded preceding and current row), " +
+ "max(b) over (order by rowtime range between unbounded preceding and current row), " +
+ "min(b) over (order by rowtime range between unbounded preceding and current row) " +
"from T1"
val data = Seq(
@@ -878,7 +878,7 @@ class SqlITCase extends StreamingWithStateTestBase {
)
val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
@@ -916,15 +916,15 @@ class SqlITCase extends StreamingWithStateTestBase {
val sqlQuery = "SELECT a, b, c, " +
"SUM(b) over (" +
- "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "partition by a order by rowtime range between unbounded preceding and current row), " +
"count(b) over (" +
- "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "partition by a order by rowtime range between unbounded preceding and current row), " +
"avg(b) over (" +
- "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "partition by a order by rowtime range between unbounded preceding and current row), " +
"max(b) over (" +
- "partition by a order by rowtime() range between unbounded preceding and current row), " +
+ "partition by a order by rowtime range between unbounded preceding and current row), " +
"min(b) over (" +
- "partition by a order by rowtime() range between unbounded preceding and current row) " +
+ "partition by a order by rowtime range between unbounded preceding and current row) " +
"from T1"
val data = Seq(
@@ -946,7 +946,7 @@ class SqlITCase extends StreamingWithStateTestBase {
)
val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv).as('a, 'b, 'c)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
tEnv.registerTable("T1", t1)
@@ -981,14 +981,15 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1)
StreamITCase.testResults = mutable.MutableList()
- val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a, " +
" SUM(c) OVER (" +
- " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
" MIN(c) OVER (" +
- " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
" FROM MyTable"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -1023,14 +1024,15 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1)
StreamITCase.testResults = mutable.MutableList()
- val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a, " +
" SUM(c) OVER (" +
- " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " +
" MIN(c) OVER (" +
- " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " +
" FROM MyTable"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -1066,14 +1068,15 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1)
StreamITCase.testResults = mutable.MutableList()
- val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a, " +
" SUM(c) OVER (" +
- " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " +
" MIN(c) OVER (" +
- " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
+ " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " +
" FROM MyTable"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
@@ -1108,14 +1111,15 @@ class SqlITCase extends StreamingWithStateTestBase {
env.setParallelism(1)
StreamITCase.testResults = mutable.MutableList()
- val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
tEnv.registerTable("MyTable", t)
val sqlQuery = "SELECT a, " +
" SUM(c) OVER (" +
- " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " +
" MIN(c) OVER (" +
- " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " +
" FROM MyTable"
val result = tEnv.sql(sqlQuery).toDataStream[Row]
result.addSink(new StreamITCase.StringSink)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index cef2665..edf7b1d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -21,22 +21,23 @@ import org.apache.flink.api.scala._
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow}
+import org.apache.flink.table.plan.logical._
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.Test
+import org.junit.{Ignore, Test}
class WindowAggregateTest extends TableTestBase {
private val streamUtil: StreamTableTestUtil = streamTestUtil()
- streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
+ streamUtil.addTable[(Int, String, Long)](
+ "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
@Test
def testNonPartitionedProcessingTimeBoundedWindow() = {
- val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" +
- "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA FROM MyTable"
-
- val expected =
+ val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY proctime " +
+ "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " +
+ "FROM MyTable"
+ val expected =
unaryNode(
"DataStreamCalc",
unaryNode(
@@ -44,11 +45,11 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "PROCTIME() AS $2")
+ term("select", "a", "c", "proctime")
),
- term("orderBy", "PROCTIME"),
+ term("orderBy", "proctime"),
term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0")
+ term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
),
term("select", "a", "w0$o0 AS $1")
)
@@ -59,7 +60,7 @@ class WindowAggregateTest extends TableTestBase {
@Test
def testPartitionedProcessingTimeBoundedWindow() = {
- val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" +
+ val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY proctime " +
"RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " +
"FROM MyTable"
val expected =
@@ -70,12 +71,12 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "PROCTIME() AS $2")
+ term("select", "a", "c", "proctime")
),
term("partitionBy","a"),
- term("orderBy", "PROCTIME"),
+ term("orderBy", "proctime"),
term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
+ term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1")
),
term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA")
)
@@ -84,23 +85,24 @@ class WindowAggregateTest extends TableTestBase {
}
@Test
+ @Ignore // TODO enable once CALCITE-1761 is fixed
def testTumbleFunction() = {
streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
val sql =
"SELECT " +
" COUNT(*), weightedAvg(c, a) AS wAvg, " +
- " TUMBLE_START(rowtime(), INTERVAL '15' MINUTE), " +
- " TUMBLE_END(rowtime(), INTERVAL '15' MINUTE)" +
+ " TUMBLE_START(rowtime, INTERVAL '15' MINUTE), " +
+ " TUMBLE_END(rowtime, INTERVAL '15' MINUTE)" +
"FROM MyTable " +
- "GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)"
+ "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
val expected =
unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamAggregate",
streamTableNode(0),
- term("window", EventTimeTumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+ term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
term("select",
"COUNT(*) AS EXPR$0, " +
"weightedAvg(c, a) AS wAvg, " +
@@ -113,23 +115,23 @@ class WindowAggregateTest extends TableTestBase {
}
@Test
+ @Ignore // TODO enable once CALCITE-1761 is fixed
def testHoppingFunction() = {
streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
val sql =
"SELECT COUNT(*), weightedAvg(c, a) AS wAvg, " +
- " HOP_START(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " +
- " HOP_END(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " +
+ " HOP_START(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " +
+ " HOP_END(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " +
"FROM MyTable " +
- "GROUP BY HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
+ "GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
val expected =
unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamAggregate",
streamTableNode(0),
- term("window", ProcessingTimeSlidingGroupWindow('w$,
- 3600000.millis, 900000.millis)),
+ term("window", SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 900000.millis)),
term("select",
"COUNT(*) AS EXPR$0, " +
"weightedAvg(c, a) AS wAvg, " +
@@ -142,23 +144,24 @@ class WindowAggregateTest extends TableTestBase {
}
@Test
+ @Ignore // TODO enable once CALCITE-1761 is fixed
def testSessionFunction() = {
streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
val sql =
"SELECT " +
" COUNT(*), weightedAvg(c, a) AS wAvg, " +
- " SESSION_START(proctime(), INTERVAL '15' MINUTE), " +
- " SESSION_END(proctime(), INTERVAL '15' MINUTE) " +
+ " SESSION_START(proctime, INTERVAL '15' MINUTE), " +
+ " SESSION_END(proctime, INTERVAL '15' MINUTE) " +
"FROM MyTable " +
- "GROUP BY SESSION(proctime(), INTERVAL '15' MINUTE)"
+ "GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)"
val expected =
unaryNode(
"DataStreamCalc",
unaryNode(
"DataStreamAggregate",
streamTableNode(0),
- term("window", ProcessingTimeSessionGroupWindow('w$, 900000.millis)),
+ term("window", SessionGroupWindow('w$, 'rowtime, 900000.millis)),
term("select",
"COUNT(*) AS EXPR$0, " +
"weightedAvg(c, a) AS wAvg, " +
@@ -175,7 +178,7 @@ class WindowAggregateTest extends TableTestBase {
val sqlQuery =
"SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
"FROM MyTable " +
- "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+ "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
streamUtil.verifySql(sqlQuery, "n/a")
}
@@ -185,7 +188,7 @@ class WindowAggregateTest extends TableTestBase {
val sqlQuery =
"SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
"FROM MyTable " +
- "GROUP BY HOP(proctime(), INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
+ "GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
streamUtil.verifySql(sqlQuery, "n/a")
}
@@ -195,21 +198,21 @@ class WindowAggregateTest extends TableTestBase {
val sqlQuery =
"SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
"FROM MyTable " +
- "GROUP BY SESSION(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+ "GROUP BY SESSION(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
streamUtil.verifySql(sqlQuery, "n/a")
}
@Test(expected = classOf[TableException])
def testVariableWindowSize() = {
- val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime(), c * INTERVAL '1' MINUTE)"
+ val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)"
streamUtil.verifySql(sql, "n/a")
}
@Test(expected = classOf[TableException])
def testMultiWindow() = {
val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " +
- "FLOOR(rowtime() TO HOUR), FLOOR(rowtime() TO MINUTE)"
+ "FLOOR(rowtime TO HOUR), FLOOR(rowtime TO MINUTE)"
val expected = ""
streamUtil.verifySql(sql, expected)
}
@@ -237,8 +240,8 @@ class WindowAggregateTest extends TableTestBase {
def testUnboundPartitionedProcessingWindowWithRange() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from MyTable"
val expected =
@@ -249,12 +252,12 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "PROCTIME() AS $2")
+ term("select", "a", "c", "proctime")
),
term("partitionBy", "c"),
- term("orderBy", "PROCTIME"),
+ term("orderBy", "proctime"),
term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
),
term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
)
@@ -265,7 +268,7 @@ class WindowAggregateTest extends TableTestBase {
def testUnboundPartitionedProcessingWindowWithRow() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
"CURRENT ROW) as cnt1 " +
"from MyTable"
@@ -274,15 +277,11 @@ class WindowAggregateTest extends TableTestBase {
"DataStreamCalc",
unaryNode(
"DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "PROCTIME() AS $2")
- ),
+ streamTableNode(0),
term("partitionBy", "c"),
- term("orderBy", "PROCTIME"),
+ term("orderBy", "proctime"),
term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+ term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
),
term("select", "c", "w0$o0 AS $1")
)
@@ -293,8 +292,8 @@ class WindowAggregateTest extends TableTestBase {
def testUnboundNonPartitionedProcessingWindowWithRange() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " +
"from MyTable"
val expected =
@@ -305,11 +304,11 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "PROCTIME() AS $2")
+ term("select", "a", "c", "proctime")
),
- term("orderBy", "PROCTIME"),
+ term("orderBy", "proctime"),
term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
),
term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
)
@@ -320,7 +319,7 @@ class WindowAggregateTest extends TableTestBase {
def testUnboundNonPartitionedProcessingWindowWithRow() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " +
"CURRENT ROW) as cnt1 " +
"from MyTable"
@@ -329,14 +328,10 @@ class WindowAggregateTest extends TableTestBase {
"DataStreamCalc",
unaryNode(
"DataStreamOverAggregate",
- unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "c", "PROCTIME() AS $2")
- ),
- term("orderBy", "PROCTIME"),
+ streamTableNode(0),
+ term("orderBy", "proctime"),
term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+ term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0")
),
term("select", "c", "w0$o0 AS $1")
)
@@ -347,8 +342,8 @@ class WindowAggregateTest extends TableTestBase {
def testUnboundNonPartitionedEventTimeWindowWithRange() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
"from MyTable"
val expected =
@@ -359,11 +354,11 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "ROWTIME() AS $2")
+ term("select", "a", "c", "rowtime")
),
- term("orderBy", "ROWTIME"),
+ term("orderBy", "rowtime"),
term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
),
term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
)
@@ -374,8 +369,8 @@ class WindowAggregateTest extends TableTestBase {
def testUnboundPartitionedEventTimeWindowWithRange() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " +
- "sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " +
"from MyTable"
val expected =
@@ -386,12 +381,12 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "ROWTIME() AS $2")
+ term("select", "a", "c", "rowtime")
),
term("partitionBy", "c"),
- term("orderBy", "ROWTIME"),
+ term("orderBy", "rowtime"),
term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1")
),
term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2")
)
@@ -402,7 +397,7 @@ class WindowAggregateTest extends TableTestBase {
def testBoundPartitionedRowTimeWindowWithRow() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
"CURRENT ROW) as cnt1 " +
"from MyTable"
@@ -414,12 +409,12 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "ROWTIME() AS $2")
+ term("select", "a", "c", "rowtime")
),
term("partitionBy", "c"),
- term("orderBy", "ROWTIME"),
+ term("orderBy", "rowtime"),
term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
),
term("select", "c", "w0$o0 AS $1")
)
@@ -430,7 +425,7 @@ class WindowAggregateTest extends TableTestBase {
def testBoundNonPartitionedRowTimeWindowWithRow() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " +
+ "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " +
"CURRENT ROW) as cnt1 " +
"from MyTable"
@@ -442,11 +437,11 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "ROWTIME() AS $2")
+ term("select", "a", "c", "rowtime")
),
- term("orderBy", "ROWTIME"),
+ term("orderBy", "rowtime"),
term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
),
term("select", "c", "w0$o0 AS $1")
)
@@ -457,7 +452,7 @@ class WindowAggregateTest extends TableTestBase {
def testBoundPartitionedRowTimeWindowWithRange() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY RowTime() " +
+ "count(a) OVER (PARTITION BY c ORDER BY rowtime " +
"RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
"from MyTable"
@@ -469,12 +464,12 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "ROWTIME() AS $2")
+ term("select", "a", "c", "rowtime")
),
term("partitionBy", "c"),
- term("orderBy", "ROWTIME"),
+ term("orderBy", "rowtime"),
term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
),
term("select", "c", "w0$o0 AS $1")
)
@@ -485,7 +480,7 @@ class WindowAggregateTest extends TableTestBase {
def testBoundNonPartitionedRowTimeWindowWithRange() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (ORDER BY RowTime() " +
+ "count(a) OVER (ORDER BY rowtime " +
"RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " +
"from MyTable"
@@ -497,11 +492,11 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "ROWTIME() AS $2")
+ term("select", "a", "c", "rowtime")
),
- term("orderBy", "ROWTIME"),
+ term("orderBy", "rowtime"),
term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+ term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
),
term("select", "c", "w0$o0 AS $1")
)
@@ -512,7 +507,7 @@ class WindowAggregateTest extends TableTestBase {
def testBoundNonPartitionedProcTimeWindowWithRowRange() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
"CURRENT ROW) as cnt1 " +
"from MyTable"
@@ -524,11 +519,11 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "PROCTIME() AS $2")
+ term("select", "a", "c", "proctime")
),
- term("orderBy", "PROCTIME"),
+ term("orderBy", "proctime"),
term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
),
term("select", "c", "w0$o0 AS $1")
)
@@ -539,7 +534,7 @@ class WindowAggregateTest extends TableTestBase {
def testBoundPartitionedProcTimeWindowWithRowRange() = {
val sql = "SELECT " +
"c, " +
- "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " +
"CURRENT ROW) as cnt1 " +
"from MyTable"
@@ -551,12 +546,12 @@ class WindowAggregateTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "a", "c", "PROCTIME() AS $2")
+ term("select", "a", "c", "proctime")
),
term("partitionBy", "c"),
- term("orderBy", "PROCTIME"),
+ term("orderBy", "proctime"),
term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
- term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+ term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
),
term("select", "c", "w0$o0 AS $1")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
index 3651749..4a6a616 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
@@ -57,13 +57,13 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
StreamITCase.testResults = mutable.MutableList()
val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
val countFun = new CountAggFunction
val weightAvgFun = new WeightedAvg
val windowedTable = table
- .window(Slide over 2.rows every 1.rows as 'w)
+ .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
.groupBy('w, 'string)
.select('string, countFun('int), 'int.avg,
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
@@ -102,7 +102,7 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
val stream = env
.fromCollection(sessionWindowTestdata)
.assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L))
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
val windowedTable = table
.window(Session withGap 5.milli on 'rowtime as 'w)
@@ -126,12 +126,12 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
StreamITCase.testResults = mutable.MutableList()
val stream = env.fromCollection(data)
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
val countFun = new CountAggFunction
val weightAvgFun = new WeightedAvg
val windowedTable = table
- .window(Tumble over 2.rows as 'w)
+ .window(Tumble over 2.rows on 'proctime as 'w)
.groupBy('w)
.select(countFun('string), 'int.avg,
weightAvgFun('long, 'int), weightAvgFun('int, 'int))
@@ -154,7 +154,7 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase {
val stream = env
.fromCollection(data)
.assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L))
- val table = stream.toTable(tEnv, 'long, 'int, 'string)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
val countFun = new CountAggFunction
val weightAvgFun = new WeightedAvg
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
index 5969e91..1114cf0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -111,22 +111,6 @@ class CalcITCase extends StreamingMultipleProgramsTestBase {
}
@Test(expected = classOf[TableException])
- def testAsWithToFewFields(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
-
- val results = ds.toDataStream[Row]
- results.addSink(new StreamITCase.StringSink)
- env.execute()
-
- val expected = mutable.MutableList("no")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test(expected = classOf[TableException])
def testAsWithToManyFields(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment