You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:39 UTC

[10/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
deleted file mode 100644
index 3eee4d4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{TableEnvironment, TableException}
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class CalcITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSelectFirst(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("1", "2", "3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-
-    // verify ProjectMergeRule.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
-      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
-      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithToFewFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithToManyFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithAmbiguousFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-
-  @Test(expected = classOf[TableException])
-  def testOnlyFieldRefInAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleFilter(): Unit = {
-    /*
-     * Test simple filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a === 3)
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    /*
-     * Test all-rejecting filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    /*
-     * Test all-passing filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello", "4,3,Hello world, how are you?",
-      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
-      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
-      "18,6,Comment#12", "20,6,Comment#14")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-    val expected = mutable.MutableList(
-      "1,1,Hi", "3,2,Hello world",
-      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
-      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
-      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
deleted file mode 100644
index 9c2d6b3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowTest.scala
+++ /dev/null
@@ -1,734 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions.{RowtimeAttribute, WindowReference}
-import org.apache.flink.api.table.plan.logical._
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
-import org.junit.{Ignore, Test}
-
-class GroupWindowTest extends TableTestBase {
-
-  // batch windows are not supported yet
-  @Test(expected = classOf[ValidationException])
-  def testInvalidBatchWindow(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .window(Session withGap 100.milli as 'string)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowProperty(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .select('string, 'string.start) // property in non windowed table
-  }
-
-  @Test(expected = classOf[TableException])
-  def testInvalidRowtime1(): Unit = {
-    val util = streamTestUtil()
-    // rowtime attribute must not be a field name
-    util.addTable[(Long, Int, String)]('rowtime, 'long, 'int, 'string)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowtime2(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .select('string, 'int as 'rowtime) // rowtime attribute must not be an alias
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowtime3(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table.as('rowtime, 'myint, 'mystring) // rowtime attribute must not be an alias
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowtime4(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      // only rowtime is a valid time attribute in a stream environment
-      .window(Tumble over 50.milli on 'string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidTumblingSize(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .window(Tumble over "WRONG") // string is not a valid interval
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidSlidingSize(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .window(Slide over "WRONG" every "WRONG") // string is not a valid interval
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidSlidingSlide(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .window(Slide over 12.rows every 1.minute) // row and time intervals may not be mixed
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidSessionGap(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .window(Session withGap 10.rows) // row interval is not valid for session windows
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowAlias1(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .window(Session withGap 100.milli as 1 + 1) // expression instead of a symbol
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidWindowAlias2(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .groupBy('string)
-      .window(Session withGap 100.milli as 'string) // field name "string" is already present
-      .select('string, 'int.count)
-  }
-
-  @Test
-  def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 50.milli)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 2.rows)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 5.milli on 'rowtime)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamAggregate
-  def testEventTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 2.rows on 'rowtime)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 50.milli every 50.milli)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 2.rows every 1.rows)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 8.milli every 10.milli on 'rowtime)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamAggregate
-  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 2.rows every 1.rows on 'rowtime)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      streamTableNode(0),
-      term("groupBy", "string"),
-      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Session withGap 7.milli on 'rowtime)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 50.milli)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamAggregate
-  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'rowtime)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-
-  @Test
-  def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 50.milli every 50.milli)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamAggregate
-  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'rowtime)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Session withGap 7.milli on 'rowtime)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testTumbleWindowStartEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window",
-        EventTimeTumblingGroupWindow(
-          Some(WindowReference("w")),
-          RowtimeAttribute(),
-          5.milli)),
-      term("select",
-        "string",
-        "COUNT(int) AS TMP_0",
-        "start(WindowReference(w)) AS TMP_1",
-        "end(WindowReference(w)) AS TMP_2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testSlideWindowStartEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term("window",
-        EventTimeSlidingGroupWindow(
-          Some(WindowReference("w")),
-          RowtimeAttribute(),
-          10.milli,
-          5.milli)),
-      term("select",
-        "string",
-        "COUNT(int) AS TMP_0",
-        "start(WindowReference(w)) AS TMP_1",
-        "end(WindowReference(w)) AS TMP_2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testSessionWindowStartWithTwoEnd(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Session withGap 3.milli on 'rowtime as 'w)
-      .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "string", "int")
-        ),
-        term("groupBy", "string"),
-        term("window",
-          EventTimeSessionGroupWindow(
-            Some(WindowReference("w")),
-            RowtimeAttribute(),
-            3.milli)),
-        term("select",
-          "string",
-          "COUNT(int) AS TMP_1",
-          "end(WindowReference(w)) AS TMP_0",
-          "start(WindowReference(w)) AS TMP_2")
-      ),
-      term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 5.millis on 'rowtime as 'w)
-      .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
-        'w.end as 'x3, 'w.end)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "string", "int")
-        ),
-        term("groupBy", "string"),
-        term("window",
-          EventTimeTumblingGroupWindow(
-            Some(WindowReference("w")),
-            RowtimeAttribute(),
-            5.millis)),
-        term("select",
-          "string",
-          "SUM(int) AS TMP_0",
-          "start(WindowReference(w)) AS TMP_1",
-          "end(WindowReference(w)) AS TMP_2")
-      ),
-      term("select",
-        "string",
-        "+(CAST(AS(TMP_0, 'TMP_3')), CAST(1)) AS s1",
-        "+(CAST(AS(TMP_0, 'TMP_4')), CAST(3)) AS s2",
-        "TMP_1 AS x",
-        "TMP_1 AS x2",
-        "TMP_2 AS x3",
-        "TMP_2 AS TMP_5")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
deleted file mode 100644
index 5096b53..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnionITCase.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class UnionITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "Hi", "Hello", "Hello world", "Hi", "Hello", "Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUnionWithFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
-
-    val results = unionDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("Hi", "Hallo")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionFieldsNameNotOverlap1(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionFieldsNameNotOverlap2(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2)
-
-    val results = unionDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    println(StreamITCase.testResults)
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
-    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
deleted file mode 100644
index 6d1a62e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UnsupportedOpsTest.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala.stream.utils.StreamTestData
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Test
-
-class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectWithAggregation(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testDistinct(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSort(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoin(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.join(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.union(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersect(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.intersect(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectAll(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.intersectAll(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMinus(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.minus(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMinusAll(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.minusAll(t2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testLimit(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
-    t1.limit(0,5)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 305f1db..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.types.Row
-import org.apache.flink.api.table.expressions.utils._
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.{TableEnvironment, TableException, Types, ValidationException}
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.apache.flink.api.table.utils._
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv}
-import org.junit.Assert.{assertTrue, fail}
-import org.junit.Test
-import org.mockito.Mockito._
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
-  @Test
-  def testJavaScalaTableAPIEquality(): Unit = {
-    // mock
-    val ds = mock(classOf[DataStream[Row]])
-    val jDs = mock(classOf[JDataStream[Row]])
-    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
-    when(ds.javaStream).thenReturn(jDs)
-    when(jDs.getType).thenReturn(typeInfo)
-
-    // Scala environment
-    val env = mock(classOf[ScalaExecutionEnv])
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
-    // Java environment
-    val javaEnv = mock(classOf[JavaExecutionEnv])
-    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
-
-    // test cross join
-    val func1 = new TableFunc1
-    javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = in1.join(func1('c) as 's).select('c, 's)
-    var javaTable = in2.join("func1(c).as(s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test left outer join
-    scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    javaTableEnv.registerFunction("func2", func2)
-    scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
-      .select('c, 'name, 'len, 'adult)
-    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = in1.join(pojo('c))
-      .select('c, 'name, 'age)
-    javaTable = in2.join("pojo(c)")
-      .select("c, name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = in1.join(func2('c) as ('name, 'len))
-      .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.join("func2(c) as (name, len)")
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = in1.join(func1('c.substring(2)) as 's)
-      .select('a, 'c, 's)
-    javaTable = in2.join("func1(substring(c, 2)) as (s)")
-      .select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // check scala object is forbidden
-    expectExceptionThrown(
-      tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
-    expectExceptionThrown(
-      javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
-    expectExceptionThrown(
-      in1.join(ObjectTableFunction('a, 1)), "Scala object")
-
-  }
-
-  @Test
-  def testInvalidTableFunction(): Unit = {
-    // mock
-    val util = streamTestUtil()
-    val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val tEnv = TableEnvironment.getTableEnvironment(mock(classOf[JavaExecutionEnv]))
-
-    //=================== check scala object is forbidden =====================
-    // Scala table environment register
-    expectExceptionThrown(util.addFunction("udtf", ObjectTableFunction), "Scala object")
-    // Java table environment register
-    expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
-    // Scala Table API directly call
-    expectExceptionThrown(t.join(ObjectTableFunction('a, 1)), "Scala object")
-
-
-    //============ throw exception when table function is not registered =========
-    // Java Table API call
-    expectExceptionThrown(t.join("nonexist(a)"), "Undefined function: NONEXIST")
-    // SQL API call
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
-      "No match found for function signature nonexist(<NUMERIC>)")
-
-
-    //========= throw exception when the called function is a scalar function ====
-    util.addFunction("func0", Func0)
-    // Java Table API call
-    expectExceptionThrown(
-      t.join("func0(a)"),
-      "only accept expressions that define table functions",
-      classOf[TableException])
-    // SQL API call
-    // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
-      null,
-      classOf[AssertionError])
-
-    //========== throw exception when the parameters is not correct ===============
-    // Java Table API call
-    util.addFunction("func2", new TableFunc2)
-    expectExceptionThrown(
-      t.join("func2(c, c)"),
-      "Given parameters of function 'FUNC2' do not match any signature")
-    // SQL API call
-    expectExceptionThrown(
-      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
-      "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
-  }
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func1", new TableFunc1)
-
-    val result1 = table.join(function('c) as 's).select('c, 's)
-
-    val expected1 = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", s"$function($$2)"),
-        term("function", function),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result1, expected1)
-
-    // test overloading
-
-    val result2 = table.join(function('c, "$") as 's).select('c, 's)
-
-    val expected2 = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", s"$function($$2, '$$')"),
-        term("function", function),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result2, expected2)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func1", new TableFunc1)
-
-    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", s"$function($$2)"),
-        term("function", function),
-        term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "LEFT")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testCustomType(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func2", new TableFunc2)
-
-    val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", s"$function($$2)"),
-        term("function", function),
-        term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-           "VARCHAR(2147483647) name, INTEGER len)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "name", "len")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testHierarchyType(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("hierarchy", new HierarchyTableFunction)
-
-    val result = table.join(function('c) as ('name, 'adult, 'len))
-
-    val expected = unaryNode(
-      "DataStreamCorrelate",
-      streamTableNode(0),
-      term("invocation", s"$function($$2)"),
-      term("function", function),
-      term("rowType",
-        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-        " VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)"),
-      term("joinType", "INNER")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testPojoType(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("pojo", new PojoTableFunc)
-
-    val result = table.join(function('c))
-
-    val expected = unaryNode(
-      "DataStreamCorrelate",
-      streamTableNode(0),
-      term("invocation", s"$function($$2)"),
-      term("function", function),
-      term("rowType",
-        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-         "INTEGER age, VARCHAR(2147483647) name)"),
-      term("joinType", "INNER")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testFilter(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func2", new TableFunc2)
-
-    val result = table
-      .join(function('c) as ('name, 'len))
-      .select('c, 'name, 'len)
-      .filter('len > 2)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", s"$function($$2)"),
-        term("function", function),
-        term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-          "VARCHAR(2147483647) name, INTEGER len)"),
-        term("joinType", "INNER"),
-        term("condition", ">($1, 2)")
-      ),
-      term("select", "c", "name", "len")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testScalarFunction(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func1", new TableFunc1)
-
-    val result = table.join(function('c.substring(2)) as 's)
-
-    val expected = unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation",  s"$function(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"),
-        term("function", function),
-        term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "INNER")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  // ----------------------------------------------------------------------------------------------
-
-  private def expectExceptionThrown(
-      function: => Unit,
-      keywords: String,
-      clazz: Class[_ <: Throwable] = classOf[ValidationException])
-    : Unit = {
-    try {
-      function
-      fail(s"Expected a $clazz, but no exception is thrown.")
-    } catch {
-      case e if e.getClass == clazz =>
-        if (keywords != null) {
-          assertTrue(
-            s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
-            e.getMessage.contains(keywords))
-        }
-      case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
deleted file mode 100644
index 4fd3cd7..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamITCase.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.utils
-
-import java.util.Collections
-
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import scala.collection.mutable
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import scala.collection.JavaConverters._
-
-object StreamITCase {
-
-  var testResults = mutable.MutableList.empty[String]
-
-  def clear = {
-    StreamITCase.testResults.clear()
-  }
-
-  def compareWithList(expected: java.util.List[String]): Unit = {
-    Collections.sort(expected)
-    assertEquals(expected.asScala, StreamITCase.testResults.sorted)
-  }
-
-  final class StringSink extends RichSinkFunction[Row]() {
-    def invoke(value: Row) {
-      testResults.synchronized {
-        testResults += value.toString 
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
deleted file mode 100644
index 321b8ac..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/utils/StreamTestData.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.utils
-
-import org.apache.flink.api.scala._
-import scala.collection.mutable
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-
-object StreamTestData {
-
-  def getSmall3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world"))
-    env.fromCollection(data)
-  }
-
-  def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world"))
-    data.+=((4, 3L, "Hello world, how are you?"))
-    data.+=((5, 3L, "I am fine."))
-    data.+=((6, 3L, "Luke Skywalker"))
-    data.+=((7, 4L, "Comment#1"))
-    data.+=((8, 4L, "Comment#2"))
-    data.+=((9, 4L, "Comment#3"))
-    data.+=((10, 4L, "Comment#4"))
-    data.+=((11, 5L, "Comment#5"))
-    data.+=((12, 5L, "Comment#6"))
-    data.+=((13, 5L, "Comment#7"))
-    data.+=((14, 5L, "Comment#8"))
-    data.+=((15, 5L, "Comment#9"))
-    data.+=((16, 6L, "Comment#10"))
-    data.+=((17, 6L, "Comment#11"))
-    data.+=((18, 6L, "Comment#12"))
-    data.+=((19, 6L, "Comment#13"))
-    data.+=((20, 6L, "Comment#14"))
-    data.+=((21, 6L, "Comment#15"))
-    env.fromCollection(data)
-  }
-
-  def get5TupleDataStream(env: StreamExecutionEnvironment):
-      DataStream[(Int, Long, Int, String, Long)] = {
-
-    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
-    data.+=((1, 1L, 0, "Hallo", 1L))
-    data.+=((2, 2L, 1, "Hallo Welt", 2L))
-    data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
-    data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L))
-    data.+=((3, 5L, 4, "ABC", 2L))
-    data.+=((3, 6L, 5, "BCD", 3L))
-    data.+=((4, 7L, 6, "CDE", 2L))
-    data.+=((4, 8L, 7, "DEF", 1L))
-    data.+=((4, 9L, 8, "EFG", 1L))
-    data.+=((4, 10L, 9, "FGH", 2L))
-    data.+=((5, 11L, 10, "GHI", 1L))
-    data.+=((5, 12L, 11, "HIJ", 3L))
-    data.+=((5, 13L, 12, "IJK", 3L))
-    data.+=((5, 14L, 13, "JKL", 2L))
-    data.+=((5, 15L, 14, "KLM", 2L))
-    env.fromCollection(data)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
deleted file mode 100644
index 6c9d2e8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/AggregationTest.scala
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
-  * Test for testing aggregate plans.
-  */
-class AggregationTest extends TableTestBase {
-
-  @Test
-  def testAggregateQueryBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable"
-
-    val setValues = unaryNode(
-      "DataSetValues",
-      batchTableNode(0),
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS EXPR$0",
-        "SUM(b) AS EXPR$1",
-        "COUNT(c) AS EXPR$2")
-    )
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testAggregateWithFilterQueryBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1"
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val setValues =  unaryNode(
-        "DataSetValues",
-        calcNode,
-        tuples(List(null,null,null)),
-        term("values","a","b","c")
-    )
-
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val aggregate = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS EXPR$0",
-        "SUM(b) AS EXPR$1",
-        "COUNT(c) AS EXPR$2")
-    )
-    util.verifySql(sqlQuery, aggregate)
-  }
-
-  @Test
-  def testAggregateGroupQueryBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable GROUP BY a"
-
-    val aggregate = unaryNode(
-        "DataSetAggregate",
-        batchTableNode(0),
-        term("groupBy", "a"),
-        term("select",
-          "a",
-          "AVG(a) AS EXPR$0",
-          "SUM(b) AS EXPR$1",
-          "COUNT(c) AS EXPR$2")
-    )
-    val expected = unaryNode(
-        "DataSetCalc",
-        aggregate,
-        term("select",
-          "EXPR$0",
-          "EXPR$1",
-          "EXPR$2")
-    )
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testAggregateGroupWithFilterQueryBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM MyTable WHERE a = 1 GROUP BY a"
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select","a", "b", "c") ,
-      term("where","=(a, 1)")
-    )
-
-    val aggregate = unaryNode(
-        "DataSetAggregate",
-        calcNode,
-        term("groupBy", "a"),
-        term("select",
-          "a",
-          "AVG(a) AS EXPR$0",
-          "SUM(b) AS EXPR$1",
-          "COUNT(c) AS EXPR$2")
-    )
-    val expected = unaryNode(
-        "DataSetCalc",
-        aggregate,
-        term("select",
-          "EXPR$0",
-          "EXPR$1",
-          "EXPR$2")
-    )
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testAggregateGroupWithFilterTableApi(): Unit = {
-
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val resultTable = sourceTable.groupBy('a)
-      .select('a, 'a.avg, 'b.sum, 'c.count)
-      .where('a === 1)
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      calcNode,
-      term("groupBy", "a"),
-      term("select",
-        "a",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-
-    util.verifyTable(resultTable,expected)
-  }
-
-  @Test
-  def testAggregateTableApi(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-    val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
-
-    val setValues = unaryNode(
-      "DataSetValues",
-      batchTableNode(0),
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-    util.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testAggregateWithFilterTableApi(): Unit = {
-    val util = batchTestUtil()
-    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-
-    val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
-      .select('a.avg,'b.sum,'c.count)
-
-    val calcNode = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", "=(a, 1)")
-    )
-
-    val setValues =  unaryNode(
-      "DataSetValues",
-      calcNode,
-      tuples(List(null,null,null)),
-      term("values","a","b","c")
-    )
-
-    val union = unaryNode(
-      "DataSetUnion",
-      setValues,
-      term("union","a","b","c")
-    )
-
-    val expected = unaryNode(
-      "DataSetAggregate",
-      union,
-      term("select",
-        "AVG(a) AS TMP_0",
-        "SUM(b) AS TMP_1",
-        "COUNT(c) AS TMP_2")
-    )
-
-    util.verifyTable(resultTable, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
deleted file mode 100644
index 2b0d446..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CalciteConfigBuilderTest.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.calcite.rel.rules.{CalcSplitRule, CalcMergeRule, FilterMergeRule}
-import org.apache.calcite.sql.fun.{SqlStdOperatorTable, OracleSqlOperatorTable}
-import org.apache.calcite.tools.RuleSets
-import org.junit.Test
-import org.junit.Assert._
-
-import scala.collection.JavaConverters._
-
-class CalciteConfigBuilderTest {
-
-  @Test
-  def testDefaultRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .build()
-
-    assertEquals(false, cc.replacesRuleSet)
-    assertFalse(cc.getRuleSet.isDefined)
-  }
-
-  @Test
-  def testReplaceRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .build()
-
-    assertEquals(true, cc.replacesRuleSet)
-    assertTrue(cc.getRuleSet.isDefined)
-    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
-    assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-  }
-
-  @Test
-  def testReplaceAddRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
-      .build()
-
-    assertEquals(true, cc.replacesRuleSet)
-    assertTrue(cc.getRuleSet.isDefined)
-    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
-    assertEquals(3, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
-  }
-
-  @Test
-  def testAddRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .build()
-
-    assertEquals(false, cc.replacesRuleSet)
-    assertTrue(cc.getRuleSet.isDefined)
-    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
-    assertEquals(1, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-  }
-
-  @Test
-  def testAddAddRules(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
-      .addRuleSet(RuleSets.ofList(CalcMergeRule.INSTANCE, CalcSplitRule.INSTANCE))
-      .build()
-
-    assertEquals(false, cc.replacesRuleSet)
-    assertTrue(cc.getRuleSet.isDefined)
-    val cSet = cc.getRuleSet.get.iterator().asScala.toSet
-    assertEquals(3, cSet.size)
-    assertTrue(cSet.contains(FilterMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcMergeRule.INSTANCE))
-    assertTrue(cSet.contains(CalcSplitRule.INSTANCE))
-  }
-
-  @Test
-  def testDefaultOperatorTable(): Unit = {
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .build()
-
-    assertEquals(false, cc.replacesSqlOperatorTable)
-    assertFalse(cc.getSqlOperatorTable.isDefined)
-  }
-
-  def testReplaceOperatorTable(): Unit = {
-
-    val oracleTable = new OracleSqlOperatorTable
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceSqlOperatorTable(oracleTable)
-      .build()
-
-    val oracleOps = oracleTable.getOperatorList.asScala
-
-    assertEquals(true, cc.replacesSqlOperatorTable)
-    assertTrue(cc.getSqlOperatorTable.isDefined)
-    val ops = cc.getSqlOperatorTable.get.getOperatorList
-      .asScala.toSet
-    assertEquals(oracleOps.size, ops.size)
-    for (o <- oracleOps) {
-      assertTrue(ops.contains(o))
-    }
-  }
-
-  def testReplaceAddOperatorTable(): Unit = {
-
-    val oracleTable = new OracleSqlOperatorTable
-    val stdTable = new SqlStdOperatorTable
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .replaceSqlOperatorTable(oracleTable)
-      .addSqlOperatorTable(stdTable)
-      .build()
-
-    val oracleOps = oracleTable.getOperatorList.asScala
-    val stdOps = stdTable.getOperatorList.asScala
-
-    assertEquals(true, cc.replacesSqlOperatorTable)
-    assertTrue(cc.getSqlOperatorTable.isDefined)
-    val ops = cc.getSqlOperatorTable.get.getOperatorList
-      .asScala.toSet
-    assertEquals(oracleOps.size + stdOps.size, ops.size)
-    for (o <- oracleOps) {
-      assertTrue(ops.contains(o))
-    }
-    for (o <- stdOps) {
-      assertTrue(ops.contains(o))
-    }
-
-  }
-
-  def testAddOperatorTable(): Unit = {
-
-    val oracleTable = new OracleSqlOperatorTable
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addSqlOperatorTable(oracleTable)
-      .build()
-
-    val oracleOps = oracleTable.getOperatorList.asScala
-
-    assertEquals(false, cc.replacesSqlOperatorTable)
-    assertTrue(cc.getSqlOperatorTable.isDefined)
-    val ops = cc.getSqlOperatorTable.get.getOperatorList
-      .asScala.toSet
-    assertEquals(oracleOps.size, ops.size)
-    for (o <- oracleOps) {
-      assertTrue(ops.contains(o))
-    }
-  }
-
-  def testAddAddOperatorTable(): Unit = {
-
-    val oracleTable = new OracleSqlOperatorTable
-    val stdTable = new SqlStdOperatorTable
-
-    val cc: CalciteConfig = new CalciteConfigBuilder()
-      .addSqlOperatorTable(oracleTable)
-      .addSqlOperatorTable(stdTable)
-      .build()
-
-    val oracleOps = oracleTable.getOperatorList.asScala
-    val stdOps = stdTable.getOperatorList.asScala
-
-    assertEquals(false, cc.replacesSqlOperatorTable)
-    assertTrue(cc.getSqlOperatorTable.isDefined)
-    val ops = cc.getSqlOperatorTable.get.getOperatorList
-      .asScala.toSet
-    assertEquals(oracleOps.size + stdOps.size, ops.size)
-    for (o <- oracleOps) {
-      assertTrue(ops.contains(o))
-    }
-    for (o <- stdOps) {
-      assertTrue(ops.contains(o))
-    }
-
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
deleted file mode 100644
index f14b9d8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/CompositeFlatteningTest.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.Test
-
-
-class CompositeFlatteningTest extends TableTestBase {
-
-  @Test(expected = classOf[ValidationException])
-  def testDuplicateFlattening(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    table.select('a.flatten(), 'a.flatten())
-  }
-
-  @Test
-  def testMultipleFlatteningsTable(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table.select('a.flatten(), 'c, 'b.flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS a$_1",
-        "a._2 AS a$_2",
-        "c",
-        "b._1 AS b$_1",
-        "b._2 AS b$_2"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testMultipleFlatteningsSql(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS _1",
-        "a._2 AS _2",
-        "c",
-        "b._1 AS _10",
-        "b._2 AS _20"
-      )
-    )
-
-    util.verifySql(
-      "SELECT MyTable.a.*, c, MyTable.b.* FROM MyTable",
-      expected)
-  }
-
-  @Test
-  def testNestedFlattenings(): Unit = {
-    val util = batchTestUtil()
-    val table = util
-      .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
-
-    val result = table.select('a.flatten(), 'b.flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "a._1 AS a$_1",
-        "a._2 AS a$_2",
-        "b"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testScalarFunctionAccess(): Unit = {
-    val util = batchTestUtil()
-    val table = util
-      .addTable[(String, Int)]("MyTable", 'a, 'b)
-
-    val result = table.select(
-      giveMeCaseClass().get("my"),
-      giveMeCaseClass().get("clazz"),
-      giveMeCaseClass().flatten())
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c0",
-        "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c1",
-        "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().my AS _c2",
-        "org.apache.flink.api.table.CompositeFlatteningTest.giveMeCaseClass$().clazz AS _c3"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}
-
-object CompositeFlatteningTest {
-
-  case class TestCaseClass(my: String, clazz: Int)
-
-  object giveMeCaseClass extends ScalarFunction {
-    def eval(): TestCaseClass = {
-      TestCaseClass("hello", 42)
-    }
-
-    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
-      createTypeInformation[TestCaseClass]
-    }
-  }
-}