You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:33 UTC

[04/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..f8d9c92
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.junit.Test
+import org.mockito.Mockito._
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+  @Test
+  def testJavaScalaTableAPIEquality(): Unit = {
+    // mock
+    val ds = mock(classOf[DataSet[Row]])
+    val jDs = mock(classOf[JDataSet[Row]])
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+    when(ds.javaSet).thenReturn(jDs)
+    when(jDs.getType).thenReturn(typeInfo)
+
+    // Scala environment
+    val env = mock(classOf[ScalaExecutionEnv])
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
+
+    // Java environment
+    val javaEnv = mock(classOf[JavaExecutionEnv])
+    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
+    val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
+    javaTableEnv.registerTable("MyTable", in2)
+
+    // test cross join
+    val func1 = new TableFunc1
+    javaTableEnv.registerFunction("func1", func1)
+    var scalaTable = in1.join(func1('c) as 's).select('c, 's)
+    var javaTable = in2.join("func1(c).as(s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test left outer join
+    scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
+    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test overloading
+    scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
+    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test custom result type
+    val func2 = new TableFunc2
+    javaTableEnv.registerFunction("func2", func2)
+    scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test hierarchy generic type
+    val hierarchy = new HierarchyTableFunction
+    javaTableEnv.registerFunction("hierarchy", hierarchy)
+    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
+      .select('c, 'name, 'len, 'adult)
+    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+      .select("c, name, len, adult")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test pojo type
+    val pojo = new PojoTableFunc
+    javaTableEnv.registerFunction("pojo", pojo)
+    scalaTable = in1.join(pojo('c))
+      .select('c, 'name, 'age)
+    javaTable = in2.join("pojo(c)")
+      .select("c, name, age")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with filter
+    scalaTable = in1.join(func2('c) as ('name, 'len))
+      .select('c, 'name, 'len).filter('len > 2)
+    javaTable = in2.join("func2(c) as (name, len)")
+      .select("c, name, len").filter("len > 2")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with scalar function
+    scalaTable = in1.join(func1('c.substring(2)) as 's)
+      .select('a, 'c, 's)
+    javaTable = in2.join("func1(substring(c, 2)) as (s)")
+      .select("a, c, s")
+    verifyTableEquals(scalaTable, javaTable)
+  }
+
+  @Test
+  def testCrossJoin(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result1 = table.join(function('c) as 's).select('c, 's)
+
+    val expected1 = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result1, expected1)
+
+    // test overloading
+
+    val result2 = table.join(function('c, "$") as 's).select('c, 's)
+
+    val expected2 = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"$function($$2, '$$')"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result2, expected2)
+  }
+
+  @Test
+  def testLeftOuterJoin(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetCorrelate",
+        batchTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
new file mode 100644
index 0000000..ef425d3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.utils
+
+object SortTestUtils {
+
+  val tupleDataSetStrings = List((1, 1L, "Hi")
+    ,(2, 2L, "Hello")
+    ,(3, 2L, "Hello world")
+    ,(4, 3L, "Hello world, how are you?")
+    ,(5, 3L, "I am fine.")
+    ,(6, 3L, "Luke Skywalker")
+    ,(7, 4L, "Comment#1")
+    ,(8, 4L, "Comment#2")
+    ,(9, 4L, "Comment#3")
+    ,(10, 4L, "Comment#4")
+    ,(11, 5L, "Comment#5")
+    ,(12, 5L, "Comment#6")
+    ,(13, 5L, "Comment#7")
+    ,(14, 5L, "Comment#8")
+    ,(15, 5L, "Comment#9")
+    ,(16, 6L, "Comment#10")
+    ,(17, 6L, "Comment#11")
+    ,(18, 6L, "Comment#12")
+    ,(19, 6L, "Comment#13")
+    ,(20, 6L, "Comment#14")
+    ,(21, 6L, "Comment#15"))
+
+  def sortExpectedly(dataSet: List[Product])
+                    (implicit ordering: Ordering[Product]): String = 
+    sortExpectedly(dataSet, 0, dataSet.length)
+
+  def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
+                    (implicit ordering: Ordering[Product]): String = {
+    dataSet
+      .sorted(ordering)
+      .slice(start, end)
+      .mkString("\n")
+      .replaceAll("[\\(\\)]", "")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
new file mode 100644
index 0000000..45315d6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.utils
+
+import java.util
+
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode}
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
+
+class TableProgramsTestBase(
+    mode: TestExecutionMode,
+    tableConfigMode: TableConfigMode)
+  extends MultipleProgramsTestBase(mode) {
+
+  def config: TableConfig = {
+    val conf = new TableConfig
+    tableConfigMode match {
+      case NO_NULL =>
+        conf.setNullCheck(false)
+      case EFFICIENT =>
+        conf.setEfficientTypeUsage(true)
+      case _ => // keep default
+    }
+    conf
+  }
+}
+
+object TableProgramsTestBase {
+  case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean)
+
+  val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false)
+  val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
+  val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
+
+  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/ExplainStreamTest.scala
new file mode 100644
index 0000000..7b168bb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/ExplainStreamTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainStreamTest
+  extends StreamingMultipleProgramsTestBase {
+
+  val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile
+
+  @Test
+  def testFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
+
+    val result = replaceString(tEnv.explain(table))
+
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilterStream0.out").mkString
+    val expect = replaceString(source)
+    assertEquals(result, expect)
+  }
+
+  @Test
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
+
+    val result = replaceString(tEnv.explain(table))
+
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnionStream0.out").mkString
+    val expect = replaceString(source)
+    assertEquals(result, expect)
+  }
+
+  def replaceString(s: String): String = {
+    /* Stage {id} is ignored, because id keeps incrementing in test class
+     * while StreamExecutionEnvironment is up
+     */
+    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
new file mode 100644
index 0000000..cdc4329
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import java.io.File
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+
+class TableSinkITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testStreamTableSink(): Unit = {
+
+    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+    tmpFile.deleteOnExit()
+    val path = tmpFile.toURI.toString
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setParallelism(4)
+
+    val input = StreamTestData.get3TupleDataStream(env)
+      .map(x => x).setParallelism(4) // increase DOP to 4
+
+    val results = input.toTable(tEnv, 'a, 'b, 'c)
+      .where('a < 5 || 'a > 17)
+      .select('c, 'b)
+      .writeToSink(new CsvTableSink(path))
+
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
+      "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
+
+    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
new file mode 100644
index 0000000..ce910db
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{CsvTableSource, StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testStreamTableSourceTableAPI(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
+    tEnv.ingest("MyTestTable")
+      .where('amount < 4)
+      .select('amount * 'id, 'name)
+      .toDataStream[Row]
+      .addSink(new StreamITCase.StringSink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "0,Record_0", "0,Record_16", "0,Record_32",
+      "1,Record_1", "17,Record_17", "36,Record_18",
+      "4,Record_2", "57,Record_19", "9,Record_3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testStreamTableSourceSQL(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
+    tEnv.sql(
+      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
+      .toDataStream[Row]
+      .addSink(new StreamITCase.StringSink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "0,Record_0", "0,Record_16", "0,Record_32",
+      "1,Record_1", "17,Record_17", "36,Record_18",
+      "4,Record_2", "57,Record_19", "9,Record_3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testCsvTableSource(): Unit = {
+
+    val csvRecords = Seq(
+      "First#Id#Score#Last",
+      "Mike#1#12.3#Smith",
+      "Bob#2#45.6#Taylor",
+      "Sam#3#7.89#Miller",
+      "Peter#4#0.12#Smith",
+      "% Just a comment",
+      "Liz#5#34.5#Williams",
+      "Sally#6#6.78#Miller",
+      "Alice#7#90.1#Smith",
+      "Kelly#8#2.34#Williams"
+    )
+
+    val tempFile = File.createTempFile("csv-test", "tmp")
+    tempFile.deleteOnExit()
+    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
+    tmpWriter.write(csvRecords.mkString("$"))
+    tmpWriter.close()
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val csvTable = new CsvTableSource(
+      tempFile.getAbsolutePath,
+      Array("first", "id", "score", "last"),
+      Array(
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.INT_TYPE_INFO,
+        BasicTypeInfo.DOUBLE_TYPE_INFO,
+        BasicTypeInfo.STRING_TYPE_INFO
+      ),
+      fieldDelim = "#",
+      rowDelim = "$",
+      ignoreFirstLine = true,
+      ignoreComments = "%"
+    )
+
+    tEnv.registerTableSource("csvTable", csvTable)
+    tEnv.sql(
+      "SELECT last, score, id FROM csvTable WHERE id < 4 ")
+      .toDataStream[Row]
+      .addSink(new StreamITCase.StringSink)
+
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Smith,12.3,1",
+      "Taylor,45.6,2",
+      "Miller,7.89,3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+
+class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row] {
+
+  val fieldTypes: Array[TypeInformation[_]] = Array(
+    BasicTypeInfo.STRING_TYPE_INFO,
+    BasicTypeInfo.LONG_TYPE_INFO,
+    BasicTypeInfo.INT_TYPE_INFO
+  )
+
+  /** Returns the data of the table as a [[DataStream]]. */
+  override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
+    execEnv.addSource(new GeneratingSourceFunction(numRecords), getReturnType).setParallelism(1)
+  }
+
+  /** Returns the types of the table fields. */
+  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+
+  /** Returns the names of the table fields. */
+  override def getFieldsNames: Array[String] = Array("name", "id", "amount")
+
+  /** Returns the [[TypeInformation]] for the return type. */
+  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
+
+  /** Returns the number of fields of the table. */
+  override def getNumberOfFields: Int = 3
+}
+
+class GeneratingSourceFunction(val num: Long) extends SourceFunction[Row] {
+
+  var running = true
+
+  override def run(ctx: SourceContext[Row]): Unit = {
+    var cnt = 0L
+    while(running && cnt < num) {
+      val out = new Row(3)
+      out.setField(0, s"Record_$cnt")
+      out.setField(1, cnt)
+      out.setField(2, (cnt % 16).toInt)
+
+      ctx.collect(out)
+      cnt += 1
+    }
+  }
+
+  override def cancel(): Unit = {
+    running = false
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
new file mode 100644
index 0000000..97e76fa
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class SqlITCase extends StreamingMultipleProgramsTestBase {
+
+  /** test selection **/
+  @Test
+  def testSelectExpressionFromTable(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("2,0", "4,1", "6,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test filtering with registered table **/
+  @Test
+  def testSimpleFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test filtering with registered datastream **/
+  @Test
+  def testDatastreamFilter(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+    tEnv.registerDataStream("MyTable", t)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union with registered tables **/
+  @Test
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT * FROM T1 " +
+      "UNION ALL " +
+      "SELECT * FROM T2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1,Hi", "1,1,Hi",
+      "2,2,Hello", "2,2,Hello",
+      "3,2,Hello world", "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union with filter **/
+  @Test
+  def testUnionWithFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
+      "UNION ALL " +
+      "SELECT * FROM T2 WHERE a = 2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T2", t2)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello",
+      "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  /** test union of a table and a datastream **/
+  @Test
+  def testUnionTableWithDataSet(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
+      "UNION ALL " +
+      "SELECT c FROM T2 WHERE a = 2"
+
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("T1", t1)
+    val t2 = StreamTestData.get3TupleDataStream(env)
+    tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery).toDataStream[Row]
+    result.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("Hello", "Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..58eedd0
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
+import org.apache.flink.table.utils._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+  @Test
+  def testCrossJoin(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1($cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+
+    // test overloading
+
+    val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
+
+    val expected2 = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1($cor0.c, '$')"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery2, expected2)
+  }
+
+  @Test
+  def testLeftOuterJoin(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1($cor0.c)"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testCustomType(): Unit = {
+    val util = streamTestUtil()
+    val func2 = new TableFunc2
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func2", func2)
+
+    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func2($cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+               "VARCHAR(2147483647) f0, INTEGER f1)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testHierarchyType(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = new HierarchyTableFunction
+    util.addFunction("hierarchy", function)
+
+    val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "hierarchy($cor0.c)"),
+        term("function", function.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testPojoType(): Unit = {
+    val util = streamTestUtil()
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = new PojoTableFunc
+    util.addFunction("pojo", function)
+
+    val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "pojo($cor0.c)"),
+        term("function", function.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+               " INTEGER age, VARCHAR(2147483647) name)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "name", "age")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testFilter(): Unit = {
+    val util = streamTestUtil()
+    val func2 = new TableFunc2
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func2", func2)
+
+    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
+      "WHERE len > 2"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func2($cor0.c)"),
+        term("function", func2.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+               "VARCHAR(2147483647) f0, INTEGER f1)"),
+        term("joinType", "INNER"),
+        term("condition", ">($1, 2)")
+      ),
+      term("select", "c", "f0 AS name", "f1 AS len")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testScalarFunction(): Unit = {
+    val util = streamTestUtil()
+    val func1 = new TableFunc1
+    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    util.addFunction("func1", func1)
+
+    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
+        term("function", func1.getClass.getCanonicalName),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "f0 AS s")
+    )
+
+    util.verifySql(sqlQuery, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
new file mode 100644
index 0000000..f41cae1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+  * We only test some aggregations until better testing of constructed DataStream
+  * programs is possible.
+  */
+class AggregationsITCase extends StreamingMultipleProgramsTestBase {
+
+  val data = List(
+    (1L, 1, "Hi"),
+    (2L, 2, "Hello"),
+    (4L, 2, "Hello"),
+    (8L, 3, "Hello world"),
+    (16L, 3, "Hello world"))
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 2.rows every 1.rows)
+      .select('string, 'int.count, 'int.avg)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Session withGap 7.milli on 'rowtime)
+      .select('string, 'int.count)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows)
+      .select('int.count)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("2", "2")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeTumblingWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .select('string, 'int.count, 'int.avg, 'w.start, 'w.end)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello world,1,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+      "Hello world,1,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+      "Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+      "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeSlidingWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
+      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005",
+      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01",
+      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015",
+      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
+      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
+      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
+      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+object GroupWindowITCase {
+  class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+
+    override def checkAndGetNextWatermark(
+        lastElement: (Long, Int, String),
+        extractedTimestamp: Long)
+      : Watermark = {
+      new Watermark(extractedTimestamp)
+    }
+
+    override def extractTimestamp(
+        element: (Long, Int, String),
+        previousElementTimestamp: Long): Long = {
+      element._1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
new file mode 100644
index 0000000..f541eb4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class CalcITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testSimpleSelectAll(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSelectFirst(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("1", "2", "3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+
+    // verify ProjectMergeRule.
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
+      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+      .select('a, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
+      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
+      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithToFewFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithToManyFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithAmbiguousFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+
+  @Test(expected = classOf[TableException])
+  def testOnlyFieldRefInAs(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleFilter(): Unit = {
+    /*
+     * Test simple filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter('a === 3)
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    /*
+     * Test all-rejecting filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    /*
+     * Test all-passing filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello", "4,3,Hello world, how are you?",
+      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
+      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
+      "18,6,Comment#12", "20,6,Comment#14")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 !== 0)
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+    val expected = mutable.MutableList(
+      "1,1,Hi", "3,2,Hello world",
+      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
+      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
+      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
new file mode 100644
index 0000000..ee24cf7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.expressions.{RowtimeAttribute, WindowReference}
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
+import org.junit.{Ignore, Test}
+
+class GroupWindowTest extends TableTestBase {
+
+  // batch windows are not supported yet
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Session withGap 100.milli as 'string)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowProperty(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .select('string, 'string.start) // property in non windowed table
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+    val util = streamTestUtil()
+    // rowtime attribute must not be a field name
+    util.addTable[(Long, Int, String)]('rowtime, 'long, 'int, 'string)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .select('string, 'int as 'rowtime) // rowtime attribute must not be an alias
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table.as('rowtime, 'myint, 'mystring) // rowtime attribute must not be an alias
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      // only rowtime is a valid time attribute in a stream environment
+      .window(Tumble over 50.milli on 'string)
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Tumble over "WRONG") // string is not a valid interval
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSlidingSize(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Slide over "WRONG" every "WRONG") // string is not a valid interval
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSlidingSlide(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Slide over 12.rows every 1.minute) // row and time intervals may not be mixed
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSessionGap(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Session withGap 10.rows) // row interval is not valid for session windows
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowAlias1(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Session withGap 100.milli as 1 + 1) // expression instead of a symbol
+      .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidWindowAlias2(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    table
+      .groupBy('string)
+      .window(Session withGap 100.milli as 'string) // field name "string" is already present
+      .select('string, 'int.count)
+  }
+
+  @Test
+  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 50.milli)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 2.rows)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 5.milli on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamAggregate
+  def testEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 2.rows on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 50.milli every 50.milli)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 2.rows every 1.rows)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 8.milli every 10.milli on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamAggregate
+  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 2.rows every 1.rows on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      streamTableNode(0),
+      term("groupBy", "string"),
+      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Session withGap 7.milli on 'rowtime)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 50.milli)
+      .select('string, 'int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
+      term("select", "string", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int")
+      ),
+      term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 5.milli on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int")
+      ),
+      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamAggregate
+  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Tumble over 2.rows on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int")
+      ),
+      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+
+  @Test
+  def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 50.milli every 50.milli)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int")
+      ),
+      term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int")
+      ),
+      term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 8.milli every 10.milli on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int")
+      ),
+      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  @Ignore // see comments in DataStreamAggregate
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int")
+      ),
+      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .window(Session withGap 7.milli on 'rowtime)
+      .select('int.count)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "int")
+      ),
+      term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
+      term("select", "COUNT(int) AS TMP_0")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTumbleWindowStartEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window",
+        EventTimeTumblingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(),
+          5.milli)),
+      term("select",
+        "string",
+        "COUNT(int) AS TMP_0",
+        "start(WindowReference(w)) AS TMP_1",
+        "end(WindowReference(w)) AS TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSlideWindowStartEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .select('string, 'int.count, 'w.start, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamAggregate",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "string", "int")
+      ),
+      term("groupBy", "string"),
+      term("window",
+        EventTimeSlidingGroupWindow(
+          Some(WindowReference("w")),
+          RowtimeAttribute(),
+          10.milli,
+          5.milli)),
+      term("select",
+        "string",
+        "COUNT(int) AS TMP_0",
+        "start(WindowReference(w)) AS TMP_1",
+        "end(WindowReference(w)) AS TMP_2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testSessionWindowStartWithTwoEnd(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Session withGap 3.milli on 'rowtime as 'w)
+      .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "string", "int")
+        ),
+        term("groupBy", "string"),
+        term("window",
+          EventTimeSessionGroupWindow(
+            Some(WindowReference("w")),
+            RowtimeAttribute(),
+            3.milli)),
+        term("select",
+          "string",
+          "COUNT(int) AS TMP_1",
+          "end(WindowReference(w)) AS TMP_0",
+          "start(WindowReference(w)) AS TMP_2")
+      ),
+      term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
+  def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+    val windowedTable = table
+      .groupBy('string)
+      .window(Tumble over 5.millis on 'rowtime as 'w)
+      .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
+        'w.end as 'x3, 'w.end)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          streamTableNode(0),
+          term("select", "string", "int")
+        ),
+        term("groupBy", "string"),
+        term("window",
+          EventTimeTumblingGroupWindow(
+            Some(WindowReference("w")),
+            RowtimeAttribute(),
+            5.millis)),
+        term("select",
+          "string",
+          "SUM(int) AS TMP_0",
+          "start(WindowReference(w)) AS TMP_1",
+          "end(WindowReference(w)) AS TMP_2")
+      ),
+      term("select",
+        "string",
+        "+(CAST(AS(TMP_0, 'TMP_3')), CAST(1)) AS s1",
+        "+(CAST(AS(TMP_0, 'TMP_4')), CAST(3)) AS s2",
+        "TMP_1 AS x",
+        "TMP_1 AS x2",
+        "TMP_2 AS x3",
+        "TMP_2 AS TMP_5")
+    )
+
+    util.verifyTable(windowedTable, expected)
+  }
+}