You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:40 UTC

[11/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
deleted file mode 100644
index 195027d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithFilter(): Unit = {
-
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-
-    val expected = "Hi,Hallo\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithJoinFilter(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
-
-    val expected = "Hello world, how are you?,Hallo Welt wie\n" +
-      "I am fine.,Hallo Welt wie\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinNonExistingKey(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'foo does not exist
-      .where('foo === 'e)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithNonMatchingKeyTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'a is Int, and 'g is String
-      .where('a === 'g)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithAmbiguousFields(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
-
-    ds1.join(ds2)
-      // must fail. Both inputs share the same field 'c
-      .where('a === 'd)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate1(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('d === 'f)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate2(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('a < 'd)
-      .select('c, 'g).collect()
-  }
-
-  @Test
-  def testJoinWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
-
-    val expected = "6"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithGroupedAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2)
-      .where('a === 'd)
-      .groupBy('a, 'd)
-      .select('b.sum, 'g.count)
-
-    val expected = "6,3\n" + "4,2\n" + "1,1"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinPushThroughJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
-
-    val joinT = ds1.join(ds2)
-      .where(Literal(true))
-      .join(ds3)
-      .where('a === 'd && 'e === 'k)
-      .select('a, 'f, 'l)
-
-    val expected = "2,1,Hello\n" + "2,1,Hello world\n" + "1,0,Hi"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithDisjunctivePred(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" +
-      "Hello,Hallo Welt\n" +
-      "I am fine.,IJK"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithExpressionPreds(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
-
-    val expected = "I am fine.,Hallo Welt\n" +
-      "Luke Skywalker,Hallo Welt wie gehts?\n" +
-      "Luke Skywalker,ABC\n" +
-      "Comment#2,HIJ\n" +
-      "Comment#2,IJK"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.join(ds2).where('b === 'e).select('c, 'g)
-  }
-
-  @Test
-  def testLeftJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-      "Hello world,ABC\n" + "Hello world, how are you?,null\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "Luke Skywalker,null\n" + "Comment#1,null\n" + "Comment#2,null\n" +
-      "Comment#3,null\n" + "Comment#4,null\n" + "Comment#5,null\n" + "Comment#6,null\n" +
-      "Comment#7,null\n" + "Comment#8,null\n" + "Comment#9,null\n" + "Comment#10,null\n" +
-      "Comment#11,null\n" + "Comment#12,null\n" + "Comment#13,null\n" + "Comment#14,null\n" +
-      "Comment#15,null\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoJoinCondition(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoEquiJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
-  }
-
-  @Test
-  def testRightJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.rightOuterJoin(ds2, "a = d && b = h").select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
-      "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
-      "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testRightJoinWithNotOnlyEquiJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.rightOuterJoin(ds2, "a = d && b < h").select('c, 'g)
-
-    val expected = "Hello world,BCD\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFullOuterJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
-      "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
-      "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n" + "Luke Skywalker,null\n" +
-      "Comment#1,null\n" + "Comment#2,null\n" + "Comment#3,null\n" + "Comment#4,null\n" +
-      "Comment#5,null\n" + "Comment#6,null\n" + "Comment#7,null\n" + "Comment#8,null\n" +
-      "Comment#9,null\n" + "Comment#10,null\n" + "Comment#11,null\n" + "Comment#12,null\n" +
-      "Comment#13,null\n" + "Comment#14,null\n" + "Comment#15,null\n" +
-      "Hello world, how are you?,null\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
deleted file mode 100644
index 0d32cb4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperatorsITCase.scala
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-@RunWith(classOf[Parameterized])
-class SetOperatorsITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testUnionAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnion(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f)
-
-    val unionDs = ds1.union(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnionAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnion(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.union(ds2).union(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentColumnSize(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    // must fail. Union inputs have different column size.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Union inputs have different field types.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2).select('c)
-  }
-
-  @Test
-  def testMinusAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minusAll(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hi\n" +
-      "Hello\n" + "Hello world\n" +
-      "Hello\n" + "Hello world\n" +
-      "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testMinus(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minus(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMinusDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Minus inputs have different field types.
-    ds1.minus(ds2)
-  }
-
-  @Test
-  def testMinusDifferentFieldNames(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'd, 'e, 'f)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minus(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testMinusAllTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.minusAll(ds2).select('c)
-  }
-
-  @Test
-  def testIntersect(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world!"))
-    val ds2 = env.fromCollection(Random.shuffle(data)).toTable(tEnv, 'a, 'b, 'c)
-
-    val intersectDS = ds1.intersect(ds2).select('c).toDataSet[Row]
-
-    val results = intersectDS.collect()
-
-    val expected = "Hi\n" + "Hello\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersectAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data1 = new mutable.MutableList[Int]
-    data1 += (1, 1, 1, 2, 2)
-    val data2 = new mutable.MutableList[Int]
-    data2 += (1, 2, 2, 2, 3)
-    val ds1 = env.fromCollection(data1).toTable(tEnv, 'c)
-    val ds2 = env.fromCollection(data2).toTable(tEnv, 'c)
-
-    val intersectDS = ds1.intersectAll(ds2).select('c).toDataSet[Row]
-
-    val expected = "1\n2\n2"
-    val results = intersectDS.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersectWithDifferentFieldNames(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'e, 'f, 'g)
-
-    val intersectDs = ds1.intersect(ds2).select('c)
-
-    val results = intersectDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectWithDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Intersect inputs have different field types.
-    ds1.intersect(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.intersect(ds2).select('c)
-  }
-
-  @Test
-  def testIntersectWithScalarExpression(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a + 1, 'b, 'c)
-    val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a + 1, 'b, 'c)
-
-    val intersectDs = ds1.intersect(ds2)
-
-    val results = intersectDs.toDataSet[Row].collect()
-    val expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
deleted file mode 100644
index b3cc054..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SortITCase.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala.batch.utils.SortTestUtils._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SortITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  def getExecutionEnvironment = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(4)
-    env
-  }
-
-  @Test
-  def testOrderByDesc(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.desc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      - x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByAsc(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByMultipleFieldsDifferentDirections(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_2.asc, '_1.desc)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      (x.productElement(1).asInstanceOf[Long], - x.productElement(0).asInstanceOf[Int]) )
-
-    val expected = sortExpectedly(tupleDataSetStrings)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByOffset(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(3)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 3, 21)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByOffsetAndFetch(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.desc).limit(3, 5)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      - x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 3, 8)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test
-  def testOrderByFetch(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).orderBy('_1.asc).limit(0, 5)
-    implicit def rowOrdering[T <: Product] = Ordering.by((x : T) =>
-      x.productElement(0).asInstanceOf[Int] )
-
-    val expected = sortExpectedly(tupleDataSetStrings, 0, 5)
-    val results = t.toDataSet[Row].mapPartition(rows => Seq(rows.toSeq)).collect()
-
-    val result = results
-      .filterNot(_.isEmpty)
-      .sortBy(_.head)(Ordering.by(f=> f.toString))
-      .reduceLeft(_ ++ _)
-
-    TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFetchWithoutOrder(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).limit(0, 5)
-
-    t.toDataSet[Row].collect()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 285a181..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.apache.flink.api.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.api.table.{TableEnvironment, Types}
-import org.apache.flink.types.Row
-import org.junit.Test
-import org.mockito.Mockito._
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
-  @Test
-  def testJavaScalaTableAPIEquality(): Unit = {
-    // mock
-    val ds = mock(classOf[DataSet[Row]])
-    val jDs = mock(classOf[JDataSet[Row]])
-    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
-    when(ds.javaSet).thenReturn(jDs)
-    when(jDs.getType).thenReturn(typeInfo)
-
-    // Scala environment
-    val env = mock(classOf[ScalaExecutionEnv])
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
-    // Java environment
-    val javaEnv = mock(classOf[JavaExecutionEnv])
-    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
-    val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
-    javaTableEnv.registerTable("MyTable", in2)
-
-    // test cross join
-    val func1 = new TableFunc1
-    javaTableEnv.registerFunction("func1", func1)
-    var scalaTable = in1.join(func1('c) as 's).select('c, 's)
-    var javaTable = in2.join("func1(c).as(s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test left outer join
-    scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
-    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test overloading
-    scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
-    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test custom result type
-    val func2 = new TableFunc2
-    javaTableEnv.registerFunction("func2", func2)
-    scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
-    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test hierarchy generic type
-    val hierarchy = new HierarchyTableFunction
-    javaTableEnv.registerFunction("hierarchy", hierarchy)
-    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
-      .select('c, 'name, 'len, 'adult)
-    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
-      .select("c, name, len, adult")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test pojo type
-    val pojo = new PojoTableFunc
-    javaTableEnv.registerFunction("pojo", pojo)
-    scalaTable = in1.join(pojo('c))
-      .select('c, 'name, 'age)
-    javaTable = in2.join("pojo(c)")
-      .select("c, name, age")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with filter
-    scalaTable = in1.join(func2('c) as ('name, 'len))
-      .select('c, 'name, 'len).filter('len > 2)
-    javaTable = in2.join("func2(c) as (name, len)")
-      .select("c, name, len").filter("len > 2")
-    verifyTableEquals(scalaTable, javaTable)
-
-    // test with scalar function
-    scalaTable = in1.join(func1('c.substring(2)) as 's)
-      .select('a, 'c, 's)
-    javaTable = in2.join("func1(substring(c, 2)) as (s)")
-      .select("a, c, s")
-    verifyTableEquals(scalaTable, javaTable)
-  }
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func1", new TableFunc1)
-
-    val result1 = table.join(function('c) as 's).select('c, 's)
-
-    val expected1 = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", s"$function($$2)"),
-        term("function", function),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result1, expected1)
-
-    // test overloading
-
-    val result2 = table.join(function('c, "$") as 's).select('c, 's)
-
-    val expected2 = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", s"$function($$2, '$$')"),
-        term("function", function),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result2, expected2)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = util.addFunction("func1", new TableFunc1)
-
-    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      unaryNode(
-        "DataSetCorrelate",
-        batchTableNode(0),
-        term("invocation", s"$function($$2)"),
-        term("function", function),
-        term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
-        term("joinType", "LEFT")
-      ),
-      term("select", "c", "s")
-    )
-
-    util.verifyTable(result, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
deleted file mode 100644
index 8d1f653..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/SortTestUtils.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.utils
-
-object SortTestUtils {
-
-  val tupleDataSetStrings = List((1, 1L, "Hi")
-    ,(2, 2L, "Hello")
-    ,(3, 2L, "Hello world")
-    ,(4, 3L, "Hello world, how are you?")
-    ,(5, 3L, "I am fine.")
-    ,(6, 3L, "Luke Skywalker")
-    ,(7, 4L, "Comment#1")
-    ,(8, 4L, "Comment#2")
-    ,(9, 4L, "Comment#3")
-    ,(10, 4L, "Comment#4")
-    ,(11, 5L, "Comment#5")
-    ,(12, 5L, "Comment#6")
-    ,(13, 5L, "Comment#7")
-    ,(14, 5L, "Comment#8")
-    ,(15, 5L, "Comment#9")
-    ,(16, 6L, "Comment#10")
-    ,(17, 6L, "Comment#11")
-    ,(18, 6L, "Comment#12")
-    ,(19, 6L, "Comment#13")
-    ,(20, 6L, "Comment#14")
-    ,(21, 6L, "Comment#15"))
-
-  def sortExpectedly(dataSet: List[Product])
-                    (implicit ordering: Ordering[Product]): String = 
-    sortExpectedly(dataSet, 0, dataSet.length)
-
-  def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
-                    (implicit ordering: Ordering[Product]): String = {
-    dataSet
-      .sorted(ordering)
-      .slice(start, end)
-      .mkString("\n")
-      .replaceAll("[\\(\\)]", "")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
deleted file mode 100644
index 2ce42d4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.utils
-
-import java.util
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode}
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConversions._
-
-class TableProgramsTestBase(
-    mode: TestExecutionMode,
-    tableConfigMode: TableConfigMode)
-  extends MultipleProgramsTestBase(mode) {
-
-  def config: TableConfig = {
-    val conf = new TableConfig
-    tableConfigMode match {
-      case NO_NULL =>
-        conf.setNullCheck(false)
-      case EFFICIENT =>
-        conf.setEfficientTypeUsage(true)
-      case _ => // keep default
-    }
-    conf
-  }
-}
-
-object TableProgramsTestBase {
-  case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean)
-
-  val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false)
-  val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
-  val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
-
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
deleted file mode 100644
index 5eebb34..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/ExplainStreamTest.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert.assertEquals
-import org.junit._
-
-class ExplainStreamTest
-  extends StreamingMultipleProgramsTestBase {
-
-  val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile
-
-  @Test
-  def testFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = replaceString(tEnv.explain(table))
-
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilterStream0.out").mkString
-    val expect = replaceString(source)
-    assertEquals(result, expect)
-  }
-
-  @Test
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = replaceString(tEnv.explain(table))
-
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnionStream0.out").mkString
-    val expect = replaceString(source)
-    assertEquals(result, expect)
-  }
-
-  def replaceString(s: String): String = {
-    /* Stage {id} is ignored, because id keeps incrementing in test class
-     * while StreamExecutionEnvironment is up
-     */
-    s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
deleted file mode 100644
index f3eb87c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSinkITCase.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream
-
-import java.io.File
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.StreamTestData
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.api.table.sinks.CsvTableSink
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.test.util.TestBaseUtils
-
-import org.junit.Test
-
-class TableSinkITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testStreamTableSink(): Unit = {
-
-    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
-    tmpFile.deleteOnExit()
-    val path = tmpFile.toURI.toString
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(4)
-
-    val input = StreamTestData.get3TupleDataStream(env)
-      .map(x => x).setParallelism(4) // increase DOP to 4
-
-    val results = input.toTable(tEnv, 'a, 'b, 'c)
-      .where('a < 5 || 'a > 17)
-      .select('c, 'b)
-      .writeToSink(new CsvTableSink(path))
-
-    env.execute()
-
-    val expected = Seq(
-      "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
-      "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
-
-    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
deleted file mode 100644
index 1c93112..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/TableSourceITCase.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.StreamITCase
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.sources.{CsvTableSource, StreamTableSource}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testStreamTableSourceTableAPI(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
-    tEnv.ingest("MyTestTable")
-      .where('amount < 4)
-      .select('amount * 'id, 'name)
-      .toDataStream[Row]
-      .addSink(new StreamITCase.StringSink)
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "0,Record_0", "0,Record_16", "0,Record_32",
-      "1,Record_1", "17,Record_17", "36,Record_18",
-      "4,Record_2", "57,Record_19", "9,Record_3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testStreamTableSourceSQL(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
-    tEnv.sql(
-      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
-      .toDataStream[Row]
-      .addSink(new StreamITCase.StringSink)
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "0,Record_0", "0,Record_16", "0,Record_32",
-      "1,Record_1", "17,Record_17", "36,Record_18",
-      "4,Record_2", "57,Record_19", "9,Record_3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testCsvTableSource(): Unit = {
-
-    val csvRecords = Seq(
-      "First#Id#Score#Last",
-      "Mike#1#12.3#Smith",
-      "Bob#2#45.6#Taylor",
-      "Sam#3#7.89#Miller",
-      "Peter#4#0.12#Smith",
-      "% Just a comment",
-      "Liz#5#34.5#Williams",
-      "Sally#6#6.78#Miller",
-      "Alice#7#90.1#Smith",
-      "Kelly#8#2.34#Williams"
-    )
-
-    val tempFile = File.createTempFile("csv-test", "tmp")
-    tempFile.deleteOnExit()
-    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
-    tmpWriter.write(csvRecords.mkString("$"))
-    tmpWriter.close()
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val csvTable = new CsvTableSource(
-      tempFile.getAbsolutePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.DOUBLE_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
-
-    tEnv.registerTableSource("csvTable", csvTable)
-    tEnv.sql(
-      "SELECT last, score, id FROM csvTable WHERE id < 4 ")
-      .toDataStream[Row]
-      .addSink(new StreamITCase.StringSink)
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "Smith,12.3,1",
-      "Taylor,45.6,2",
-      "Miller,7.89,3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}
-
-
-class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row] {
-
-  val fieldTypes: Array[TypeInformation[_]] = Array(
-    BasicTypeInfo.STRING_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO,
-    BasicTypeInfo.INT_TYPE_INFO
-  )
-
-  /** Returns the data of the table as a [[DataStream]]. */
-  override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
-    execEnv.addSource(new GeneratingSourceFunction(numRecords), getReturnType).setParallelism(1)
-  }
-
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = 3
-}
-
-class GeneratingSourceFunction(val num: Long) extends SourceFunction[Row] {
-
-  var running = true
-
-  override def run(ctx: SourceContext[Row]): Unit = {
-    var cnt = 0L
-    while(running && cnt < num) {
-      val out = new Row(3)
-      out.setField(0, s"Record_$cnt")
-      out.setField(1, cnt)
-      out.setField(2, (cnt % 16).toInt)
-
-      ctx.collect(out)
-      cnt += 1
-    }
-  }
-
-  override def cancel(): Unit = {
-    running = false
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
deleted file mode 100644
index c4ca964..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/SqlITCase.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamTestData, StreamITCase}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit._
-
-import scala.collection.mutable
-
-class SqlITCase extends StreamingMultipleProgramsTestBase {
-
-  /** test selection **/
-  @Test
-  def testSelectExpressionFromTable(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("2,0", "4,1", "6,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test filtering with registered table **/
-  @Test
-  def testSimpleFilter(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test filtering with registered datastream **/
-  @Test
-  def testDatastreamFilter(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
-
-    val t = StreamTestData.getSmall3TupleDataStream(env)
-    tEnv.registerDataStream("MyTable", t)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union with registered tables **/
-  @Test
-  def testUnion(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val sqlQuery = "SELECT * FROM T1 " +
-      "UNION ALL " +
-      "SELECT * FROM T2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1,Hi", "1,1,Hi",
-      "2,2,Hello", "2,2,Hello",
-      "3,2,Hello world", "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union with filter **/
-  @Test
-  def testUnionWithFilter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
-      "UNION ALL " +
-      "SELECT * FROM T2 WHERE a = 2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T2", t2)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello",
-      "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  /** test union of a table and a datastream **/
-  @Test
-  def testUnionTableWithDataSet(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
-      "UNION ALL " +
-      "SELECT c FROM T2 WHERE a = 2"
-
-    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("T1", t1)
-    val t2 = StreamTestData.get3TupleDataStream(env)
-    tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("Hello", "Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 21629e4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
-import org.apache.flink.api.table.utils._
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.junit.Test
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
-  @Test
-  def testCrossJoin(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-
-    // test overloading
-
-    val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
-
-    val expected2 = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c, '$')"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery2, expected2)
-  }
-
-  @Test
-  def testLeftOuterJoin(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1($cor0.c)"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "LEFT")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testCustomType(): Unit = {
-    val util = streamTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testHierarchyType(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new HierarchyTableFunction
-    util.addFunction("hierarchy", function)
-
-    val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "hierarchy($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testPojoType(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    val function = new PojoTableFunc
-    util.addFunction("pojo", function)
-
-    val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "pojo($cor0.c)"),
-        term("function", function.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " INTEGER age, VARCHAR(2147483647) name)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "name", "age")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testFilter(): Unit = {
-    val util = streamTestUtil()
-    val func2 = new TableFunc2
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func2", func2)
-
-    val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
-      "WHERE len > 2"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func2($cor0.c)"),
-        term("function", func2.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
-        term("joinType", "INNER"),
-        term("condition", ">($1, 2)")
-      ),
-      term("select", "c", "f0 AS name", "f1 AS len")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testScalarFunction(): Unit = {
-    val util = streamTestUtil()
-    val func1 = new TableFunc1
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-    util.addFunction("func1", func1)
-
-    val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      unaryNode(
-        "DataStreamCorrelate",
-        streamTableNode(0),
-        term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
-        term("function", func1.getClass.getCanonicalName),
-        term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
-        term("joinType", "INNER")
-      ),
-      term("select", "c", "f0 AS s")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
deleted file mode 100644
index d398556..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
-import org.apache.flink.api.scala.stream.utils.StreamITCase
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table._
-import org.apache.flink.types.Row
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
-  * We only test some aggregations until better testing of constructed DataStream
-  * programs is possible.
-  */
-class AggregationsITCase extends StreamingMultipleProgramsTestBase {
-
-  val data = List(
-    (1L, 1, "Hi"),
-    (2L, 2, "Hello"),
-    (4L, 2, "Hello"),
-    (8L, 3, "Hello world"),
-    (16L, 3, "Hello world"))
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 2.rows every 1.rows)
-      .select('string, 'int.count, 'int.avg)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTime(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Session withGap 7.milli on 'rowtime)
-      .select('string, 'int.count)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows)
-      .select('int.count)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq("2", "2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeTumblingWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .select('string, 'int.count, 'int.avg, 'w.start, 'w.end)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq(
-      "Hello world,1,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
-      "Hello world,1,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
-      "Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSlidingWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'string)
-
-    val windowedTable = table
-      .groupBy('string)
-      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
-      .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
-
-    val results = windowedTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = Seq(
-      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
-      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005",
-      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01",
-      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015",
-      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
-      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
-      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}
-
-object GroupWindowITCase {
-  class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
-
-    override def checkAndGetNextWatermark(
-        lastElement: (Long, Int, String),
-        extractedTimestamp: Long)
-      : Watermark = {
-      new Watermark(extractedTimestamp)
-    }
-
-    override def extractTimestamp(
-        element: (Long, Int, String),
-        previousElementTimestamp: Long): Long = {
-      element._1
-    }
-  }
-}