You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:31 UTC

[22/44] flink git commit: [FLINK-6617] [table] Restructuring of tests

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
deleted file mode 100644
index 2e8f206..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2}
-import org.apache.flink.table.runtime.datastream.StreamITCase
-import org.apache.flink.table.utils.UserDefinedFunctionTestUtils
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class CalcITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSelectFirst(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("1", "2", "3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-
-    // verify ProjectMergeRule.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
-      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
-      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val results = ds.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
- @Test
-  def testSimpleFilter(): Unit = {
-    /*
-     * Test simple filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a === 3)
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    /*
-     * Test all-rejecting filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    /*
-     * Test all-passing filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello", "4,3,Hello world, how are you?",
-      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
-      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
-      "18,6,Comment#12", "20,6,Comment#14")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val results = filterDs.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-    val expected = mutable.MutableList(
-      "1,1,Hi", "3,2,Hello world",
-      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
-      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
-      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testUserDefinedFunctionWithParameter(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.registerFunction("RichFunc2", new RichFunc2)
-    UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "ABC"))
-
-    StreamITCase.testResults = mutable.MutableList()
-
-    val result = StreamTestData.get3TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .where("RichFunc2(c)='ABC#Hello'")
-      .select('c)
-
-    val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("Hello")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testMultipleUserDefinedFunctions(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.registerFunction("RichFunc1", new RichFunc1)
-    tEnv.registerFunction("RichFunc2", new RichFunc2)
-    UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "Abc"))
-
-    StreamITCase.testResults = mutable.MutableList()
-
-    val result = StreamTestData.get3TupleDataStream(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2")
-      .select('c)
-
-    val results = result.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList("Hello", "Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
deleted file mode 100644
index 8df14e8..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.datastream.StreamITCase.RetractingSink
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
-import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
-  * Tests of groupby (without window) aggregations
-  */
-class GroupAggregationsITCase extends StreamingWithStateTestBase {
-  private val queryConfig = new StreamQueryConfig()
-  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-
-
-  @Test
-  def testDistinct(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('b).distinct()
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testDistinctAfterAggregate(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .groupBy('e).select('e, 'a.count).distinct()
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = mutable.MutableList("1,5", "2,7", "3,3")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testNonKeyedGroupAggregate(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-            .select('a.sum, 'b.sum)
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = List("231,91")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testGroupAggregate(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new StreamITCase.RetractingSink)
-    env.execute()
-
-    val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testDoubleGroupAggregation(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.count as 'cnt, 'b)
-      .groupBy('cnt)
-      .select('cnt, 'b.count as 'freq)
-
-    val results = t.toRetractStream[Row](queryConfig)
-
-    results.addSink(new RetractingSink)
-    env.execute()
-    val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  @Test
-  def testGroupAggregateWithExpression(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .groupBy('e, 'b % 3)
-      .select('c.min, 'e, 'a.avg, 'd.count)
-
-    val results = t.toRetractStream[Row](queryConfig)
-    results.addSink(new RetractingSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "0,1,1,1", "7,1,4,2", "2,1,3,2",
-      "3,2,3,3", "1,2,3,3", "14,2,5,1",
-      "12,3,5,1", "5,3,4,2")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
deleted file mode 100644
index 1af85d9..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import java.math.BigDecimal
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.table.runtime.datastream.StreamITCase
-import org.apache.flink.table.runtime.datastream.table.GroupWindowAggregationsITCase._
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
-  * We only test some aggregations until better testing of constructed DataStream
-  * programs is possible.
-  */
-class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
-  private val queryConfig = new StreamQueryConfig()
-  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-
-  val data = List(
-    (1L, 1, "Hi"),
-    (2L, 2, "Hello"),
-    (4L, 2, "Hello"),
-    (8L, 3, "Hello world"),
-    (16L, 3, "Hello world"))
-
-  val data2 = List(
-    (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
-    (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
-    (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
-    (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
-    (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
-    (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
-    (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
-
-  @Test
-  def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
-
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w, 'string)
-      .select('string, countFun('int), 'int.avg,
-              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
-    val results = windowedTable.toAppendStream[Row](queryConfig)
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
-                       "Hello,2,2,3,2", "Hi,1,1,1,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSessionGroupWindowOverTime(): Unit = {
-    //To verify the "merge" functionality, we create this test with the following characteristics:
-    // 1. set the Parallelism to 1, and have the test data out of order
-    // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
-    val sessionWindowTestdata = List(
-      (1L, 1, "Hello"),
-      (2L, 2, "Hello"),
-      (8L, 8, "Hello"),
-      (9L, 9, "Hello World"),
-      (4L, 4, "Hello"),
-      (16L, 16, "Hello"))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvgWithMerge
-
-    val stream = env
-      .fromCollection(sessionWindowTestdata)
-      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Session withGap 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, countFun('int), 'int.avg,
-              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Tumble over 2.rows on 'proctime as 'w)
-      .groupBy('w)
-      .select(countFun('string), 'int.avg,
-              weightAvgFun('long, 'int), weightAvgFun('int, 'int))
-
-    val results = windowedTable.toAppendStream[Row](queryConfig)
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq("2,1,1,1", "2,2,6,2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeTumblingWindow(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](0L))
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
-              weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "Hello world,1,3,8,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
-      "Hello world,1,3,16,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
-      "Hello,2,2,3,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testGroupWindowWithoutKeyInProjection(): Unit = {
-    val data = List(
-      (1L, 1, "Hi", 1, 1),
-      (2L, 2, "Hello", 2, 2),
-      (4L, 2, "Hello", 2, 2),
-      (8L, 3, "Hello world", 3, 3),
-      (16L, 3, "Hello world", 3, 3))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime)
-
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
-      .groupBy('w, 'int2, 'int3, 'string)
-      .select(weightAvgFun('long, 'int))
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq("12", "8", "2", "3", "1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-
-
-  // ----------------------------------------------------------------------------------------------
-  // Sliding windows
-  // ----------------------------------------------------------------------------------------------
-
-  @Test
-  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
-    // please keep this test in sync with the DataSet variant
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data2)
-      .assignTimestampsAndWatermarks(
-        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
-    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 5.milli every 2.milli on 'long as 'w)
-      .groupBy('w)
-      .select('int.count, 'w.start, 'w.end)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
-      "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
-      "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019",
-      "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
-      "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003",
-      "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
-      "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
-      "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
-      "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
-    // please keep this test in sync with the DataSet variant
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data2)
-      .assignTimestampsAndWatermarks(
-        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
-    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 10.milli every 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
-      "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
-      "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
-      "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02",
-      "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025",
-      "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
-      "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
-      "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
-      "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
-    // please keep this test in sync with the DataSet variant
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data2)
-      .assignTimestampsAndWatermarks(
-        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
-    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 5.milli every 4.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
-      "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
-      "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
-      "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
-      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
-    // please keep this test in sync with the DataSet variant
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data2)
-      .assignTimestampsAndWatermarks(
-        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
-    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 5.milli every 10.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
-    // please keep this test in sync with the DataSet variant
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data2)
-      .assignTimestampsAndWatermarks(
-        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
-    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
-
-    val windowedTable = table
-      .window(Slide over 3.milli every 10.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env
-      .fromCollection(data2)
-      .assignTimestampsAndWatermarks(
-        new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
-      .map(t => (t._2, t._6))
-    val table = stream.toTable(tEnv, 'int, 'string, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count, 'w.start, 'w.end)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-    val expected = Seq(
-      "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
-      "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}
-
-object GroupWindowAggregationsITCase {
-
-  class TimestampAndWatermarkWithOffset[T <: Product](
-    offset: Long) extends AssignerWithPunctuatedWatermarks[T] {
-
-    override def checkAndGetNextWatermark(
-        lastElement: T,
-        extractedTimestamp: Long): Watermark = {
-      new Watermark(extractedTimestamp - offset)
-    }
-
-    override def extractTimestamp(
-        element: T,
-        previousElementTimestamp: Long): Long = {
-      element.productElement(0).asInstanceOf[Long]
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/OverWindowITCase.scala
deleted file mode 100644
index 531e26f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/OverWindowITCase.scala
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions.JavaFunc0
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.functions.aggfunctions.CountAggFunction
-import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.table.runtime.datastream.table.OverWindowITCase._
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class OverWindowITCase extends StreamingWithStateTestBase {
-
-  @Test
-  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
-
-    val data = List(
-      (1L, 1, "Hello"),
-      (2L, 2, "Hello"),
-      (3L, 3, "Hello"),
-      (4L, 4, "Hello"),
-      (5L, 5, "Hello"),
-      (6L, 6, "Hello"),
-      (7L, 7, "Hello World"),
-      (8L, 8, "Hello World"),
-      (20L, 20, "Hello World"))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    StreamITCase.clear
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-
-    val windowedTable = table
-      .window(
-        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
-      .select('c, countFun('b) over 'w as 'mycount, weightAvgFun('a, 'b) over 'w as 'wAvg)
-      .select('c, 'mycount, 'wAvg)
-
-    val results = windowedTable.toAppendStream[Row]
-    results.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = Seq(
-      "Hello World,1,7", "Hello World,2,7", "Hello World,3,14",
-      "Hello,1,1", "Hello,2,1", "Hello,3,2", "Hello,4,3", "Hello,5,3", "Hello,6,4")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
-    StreamITCase.clear
-    env.setParallelism(1)
-
-    val data = Seq(
-      Left(14000005L, (1, 1L, "Hi")),
-      Left(14000000L, (2, 1L, "Hello")),
-      Left(14000002L, (1, 1L, "Hello")),
-      Left(14000002L, (1, 2L, "Hello")),
-      Left(14000002L, (1, 3L, "Hello world")),
-      Left(14000003L, (2, 2L, "Hello world")),
-      Left(14000003L, (2, 3L, "Hello world")),
-      Right(14000020L),
-      Left(14000021L, (1, 4L, "Hello world")),
-      Left(14000022L, (1, 5L, "Hello world")),
-      Left(14000022L, (1, 6L, "Hello world")),
-      Left(14000022L, (1, 7L, "Hello world")),
-      Left(14000023L, (2, 4L, "Hello world")),
-      Left(14000023L, (2, 5L, "Hello world")),
-      Right(14000030L)
-    )
-    val table = env
-      .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-    val countFun = new CountAggFunction
-    val weightAvgFun = new WeightedAvg
-    val plusOne = new JavaFunc0
-
-    val windowedTable = table
-      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following
-         CURRENT_RANGE as 'w)
-      .select(
-        'a, 'b, 'c,
-        'b.sum over 'w,
-        "SUM:".toExpr + ('b.sum over 'w),
-        countFun('b) over 'w,
-        (countFun('b) over 'w) + 1,
-        plusOne(countFun('b) over 'w),
-        array('b.avg over 'w, 'b.max over 'w),
-        'b.avg over 'w,
-        'b.max over 'w,
-        'b.min over 'w,
-        ('b.min over 'w).abs(),
-        weightAvgFun('b, 'a) over 'w)
-
-    val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "1,2,Hello,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "1,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "1,1,Hi,7,SUM:7,4,5,5,[1, 3],1,3,1,1,1",
-      "2,1,Hello,1,SUM:1,1,2,2,[1, 1],1,1,1,1,1",
-      "2,2,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "2,3,Hello world,6,SUM:6,3,4,4,[2, 3],2,3,1,1,2",
-      "1,4,Hello world,11,SUM:11,5,6,6,[2, 4],2,4,1,1,2",
-      "1,5,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
-      "1,6,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
-      "1,7,Hello world,29,SUM:29,8,9,9,[3, 7],3,7,1,1,3",
-      "2,4,Hello world,15,SUM:15,5,6,6,[3, 5],3,5,1,1,3",
-      "2,5,Hello world,15,SUM:15,5,6,6,[3, 5],3,5,1,1,3"
-    )
-
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testProcTimeBoundedPartitionedRowsOver(): Unit = {
-
-    val data = List(
-      (1, 1L, 0, "Hallo", 1L),
-      (2, 2L, 1, "Hallo Welt", 2L),
-      (2, 3L, 2, "Hallo Welt wie", 1L),
-      (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
-      (3, 5L, 4, "ABC", 2L),
-      (3, 6L, 5, "BCD", 3L),
-      (4, 7L, 6, "CDE", 2L),
-      (4, 8L, 7, "DEF", 1L),
-      (4, 9L, 8, "EFG", 1L),
-      (4, 10L, 9, "FGH", 2L),
-      (5, 11L, 10, "GHI", 1L),
-      (5, 12L, 11, "HIJ", 3L),
-      (5, 13L, 12, "IJK", 3L),
-      (5, 14L, 13, "JKL", 2L),
-      (5, 15L, 14, "KLM", 2L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(1)
-    StreamITCase.testResults = mutable.MutableList()
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
-
-    val windowedTable = table
-      .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
-      .select('a, 'c.sum over 'w, 'c.min over 'w)
-    val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,0,0",
-      "2,1,1",
-      "2,3,1",
-      "3,3,3",
-      "3,7,3",
-      "3,12,3",
-      "4,6,6",
-      "4,13,6",
-      "4,21,6",
-      "4,30,6",
-      "5,10,10",
-      "5,21,10",
-      "5,33,10",
-      "5,46,10",
-      "5,60,10")
-
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRowOver(): Unit = {
-    val data = Seq(
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((2L, (2L, 2, "Hello"))),
-      Left((1L, (1L, 1, "Hello"))),
-      Left((3L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Left((1L, (7L, 7, "Hello World"))),
-      Right(2L),
-      Left((3L, (3L, 3, "Hello"))),
-      Left((4L, (4L, 4, "Hello"))),
-      Left((5L, (5L, 5, "Hello"))),
-      Left((6L, (6L, 6, "Hello"))),
-      Left((20L, (20L, 20, "Hello World"))),
-      Right(6L),
-      Left((8L, (8L, 8, "Hello World"))),
-      Left((7L, (7L, 7, "Hello World"))),
-      Right(20L))
-
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(1)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val table = env.addSource[(Long, Int, String)](
-      new RowTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
-      .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
-
-    val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
-      "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
-      "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
-      "Hello,6,3,15",
-      "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
-      "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testRowTimeBoundedPartitionedRangeOver(): Unit = {
-    val data = Seq(
-      Left((1500L, (1L, 15, "Hello"))),
-      Left((1600L, (1L, 16, "Hello"))),
-      Left((1000L, (1L, 1, "Hello"))),
-      Left((2000L, (2L, 2, "Hello"))),
-      Right(1000L),
-      Left((2000L, (2L, 2, "Hello"))),
-      Left((2000L, (2L, 3, "Hello"))),
-      Left((3000L, (3L, 3, "Hello"))),
-      Right(2000L),
-      Left((4000L, (4L, 4, "Hello"))),
-      Right(3000L),
-      Left((5000L, (5L, 5, "Hello"))),
-      Right(5000L),
-      Left((6000L, (6L, 6, "Hello"))),
-      Left((6500L, (6L, 65, "Hello"))),
-      Right(7000L),
-      Left((9000L, (6L, 9, "Hello"))),
-      Left((9500L, (6L, 18, "Hello"))),
-      Left((9000L, (6L, 9, "Hello"))),
-      Right(10000L),
-      Left((10000L, (7L, 7, "Hello World"))),
-      Left((11000L, (7L, 17, "Hello World"))),
-      Left((11000L, (7L, 77, "Hello World"))),
-      Right(12000L),
-      Left((14000L, (7L, 18, "Hello World"))),
-      Right(14000L),
-      Left((15000L, (8L, 8, "Hello World"))),
-      Right(17000L),
-      Left((20000L, (20L, 20, "Hello World"))),
-      Right(19000L))
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setStateBackend(getStateBackend)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-
-    val table = env.addSource[(Long, Int, String)](
-      new RowTimeSourceFunction[(Long, Int, String)](data))
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
-    val windowedTable = table
-      .window(
-        Over partitionBy 'c orderBy 'rowtime preceding 1.seconds following CURRENT_RANGE as 'w)
-      .select('c, 'b, 'a.count over 'w, 'a.sum over 'w)
-
-    val result = windowedTable.toAppendStream[Row]
-    result.addSink(new StreamITCase.StringSink[Row])
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
-      "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
-      "Hello,3,4,9",
-      "Hello,4,2,7",
-      "Hello,5,2,9",
-      "Hello,6,2,11", "Hello,65,2,12",
-      "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
-      "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
-      "Hello World,8,2,15",
-      "Hello World,20,1,20")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}
-
-object OverWindowITCase {
-
-  class RowTimeSourceFunction[T](
-      dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
-    override def run(ctx: SourceContext[T]): Unit = {
-      dataWithTimestampList.foreach {
-        case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
-        case Right(w) => ctx.emitWatermark(new Watermark(w))
-      }
-    }
-
-    override def cancel(): Unit = ???
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/RetractionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/RetractionITCase.scala
deleted file mode 100644
index e8cfd75..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/RetractionITCase.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
-import org.apache.flink.table.utils.TableFunc0
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-
-/**
-  * tests for retraction
-  */
-class RetractionITCase extends StreamingWithStateTestBase {
-  // input data
-  val data = List(
-    ("Hello", 1),
-    ("word", 1),
-    ("Hello", 1),
-    ("bark", 1),
-    ("bark", 1),
-    ("bark", 1),
-    ("bark", 1),
-    ("bark", 1),
-    ("bark", 1),
-    ("flink", 1)
-  )
-
-  // keyed groupby + keyed groupby
-  @Test
-  def testWordCount(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-    env.setStateBackend(getStateBackend)
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'word, 'num)
-    val resultTable = table
-      .groupBy('word)
-      .select('num.sum as 'count)
-      .groupBy('count)
-      .select('count, 'count.count as 'frequency)
-
-    val results = resultTable.toRetractStream[Row]
-    results.addSink(new StreamITCase.RetractingSink)
-    env.execute()
-
-    val expected = Seq("1,2", "2,1", "6,1")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  // keyed groupby + non-keyed groupby
-  @Test
-  def testGroupByAndNonKeyedGroupBy(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-    env.setStateBackend(getStateBackend)
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'word, 'num)
-    val resultTable = table
-      .groupBy('word)
-      .select('word as 'word, 'num.sum as 'cnt)
-      .select('cnt.sum)
-
-    val results = resultTable.toRetractStream[Row]
-
-    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
-    env.execute()
-
-    val expected = Seq("10")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  // non-keyed groupby + keyed groupby
-  @Test
-  def testNonKeyedGroupByAndGroupBy(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-    env.setStateBackend(getStateBackend)
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'word, 'num)
-    val resultTable = table
-      .select('num.sum as 'count)
-      .groupBy('count)
-      .select('count, 'count.count)
-
-    val results = resultTable.toRetractStream[Row]
-    results.addSink(new StreamITCase.RetractingSink)
-    env.execute()
-
-    val expected = Seq("10,1")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-  // test unique process, if the current output message of unbounded groupby equals the
-  // previous message, unbounded groupby will ignore the current one.
-  @Test
-  def testUniqueProcess(): Unit = {
-    // data input
-    val data = List(
-      (1, 1L),
-      (2, 2L),
-      (3, 3L),
-      (3, 3L),
-      (4, 1L),
-      (4, 0L),
-      (4, 0L),
-      (4, 0L),
-      (5, 1L),
-      (6, 6L),
-      (6, 6L),
-      (6, 6L),
-      (7, 8L)
-    )
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-    env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'pk, 'value)
-    val resultTable = table
-      .groupBy('pk)
-      .select('pk as 'pk, 'value.sum as 'sum)
-      .groupBy('sum)
-      .select('sum, 'pk.count as 'count)
-
-    val results = resultTable.toRetractStream[Row]
-    results.addSink(new StreamITCase.RetractMessagesSink)
-    env.execute()
-
-    val expected = Seq(
-      "+1,1", "+2,1", "+3,1", "-3,1", "+6,1", "-1,1", "+1,2", "-1,2", "+1,3", "-6,1", "+6,2",
-      "-6,2", "+6,1", "+12,1", "-12,1", "+18,1", "+8,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  // correlate should handle retraction messages correctly
-  @Test
-  def testCorrelate(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.clear
-    env.setStateBackend(getStateBackend)
-
-    val func0 = new TableFunc0
-
-    val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'word, 'num)
-    val resultTable = table
-      .groupBy('word)
-      .select('word as 'word, 'num.sum as 'cnt)
-      .leftOuterJoin(func0('word))
-      .groupBy('cnt)
-      .select('cnt, 'word.count as 'frequency)
-
-    val results = resultTable.toRetractStream[Row]
-    results.addSink(new StreamITCase.RetractingSink)
-    env.execute()
-
-    val expected = Seq("1,2", "2,1", "6,1")
-    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSinksITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSinksITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSinksITCase.scala
deleted file mode 100644
index 8a1e398..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSinksITCase.scala
+++ /dev/null
@@ -1,510 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import java.io.File
-import java.lang.{Boolean => JBool}
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.sinks._
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSinksITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testStreamTableSink(): Unit = {
-
-    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
-    tmpFile.deleteOnExit()
-    val path = tmpFile.toURI.toString
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(4)
-
-    val input = StreamTestData.get3TupleDataStream(env)
-      .map(x => x).setParallelism(4) // increase DOP to 4
-
-    val results = input.toTable(tEnv, 'a, 'b, 'c)
-      .where('a < 5 || 'a > 17)
-      .select('c, 'b)
-      .writeToSink(new CsvTableSink(path))
-
-    env.execute()
-
-    val expected = Seq(
-      "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
-      "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
-
-    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
-  }
-
-  @Test
-  def testAppendSinkOnAppendTable(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env)
-        .assignAscendingTimestamps(_._1.toLong)
-        .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
-      .groupBy('w)
-      .select('w.end, 'id.count, 'num.sum)
-      .writeToSink(new TestAppendSink)
-
-    env.execute()
-
-    val result = RowCollector.getAndClearValues.map(_.f1.toString).sorted
-    val expected = List(
-      "1970-01-01 00:00:00.005,4,8",
-      "1970-01-01 00:00:00.01,5,18",
-      "1970-01-01 00:00:00.015,5,24",
-      "1970-01-01 00:00:00.02,5,29",
-      "1970-01-01 00:00:00.025,2,12")
-      .sorted
-    assertEquals(expected, result)
-  }
-
-  @Test
-  def testRetractSinkOnUpdatingTable(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env)
-      .assignAscendingTimestamps(_._1.toLong)
-      .toTable(tEnv, 'id, 'num, 'text)
-
-    t.select('id, 'num, 'text.charLength() as 'len)
-      .groupBy('len)
-      .select('len, 'id.count, 'num.sum)
-      .writeToSink(new TestRetractSink)
-
-    env.execute()
-    val results = RowCollector.getAndClearValues
-
-    val retracted = restractResults(results).sorted
-    val expected = List(
-      "2,1,1",
-      "5,1,2",
-      "11,1,2",
-      "25,1,3",
-      "10,7,39",
-      "14,1,3",
-      "9,9,41").sorted
-    assertEquals(expected, retracted)
-
-  }
-
-  @Test
-  def testRetractSinkOnAppendTable(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env)
-      .assignAscendingTimestamps(_._1.toLong)
-      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
-      .groupBy('w)
-      .select('w.end, 'id.count, 'num.sum)
-      .writeToSink(new TestRetractSink)
-
-    env.execute()
-    val results = RowCollector.getAndClearValues
-
-    assertFalse(
-      "Received retraction messages for append only table",
-      results.exists(!_.f0))
-
-    val retracted = restractResults(results).sorted
-    val expected = List(
-      "1970-01-01 00:00:00.005,4,8",
-      "1970-01-01 00:00:00.01,5,18",
-      "1970-01-01 00:00:00.015,5,24",
-      "1970-01-01 00:00:00.02,5,29",
-      "1970-01-01 00:00:00.025,2,12")
-      .sorted
-    assertEquals(expected, retracted)
-
-  }
-
-  @Test
-  def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env)
-      .assignAscendingTimestamps(_._1.toLong)
-      .toTable(tEnv, 'id, 'num, 'text)
-
-    t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
-      .groupBy('len, 'cTrue)
-      .select('len, 'id.count as 'cnt, 'cTrue)
-      .groupBy('cnt, 'cTrue)
-      .select('cnt, 'len.count, 'cTrue)
-      .writeToSink(new TestUpsertSink(Array("cnt", "cTrue"), false))
-
-    env.execute()
-    val results = RowCollector.getAndClearValues
-
-    assertTrue(
-      "Results must include delete messages",
-      results.exists(_.f0 == false)
-    )
-
-    val retracted = upsertResults(results, Array(0, 2)).sorted
-    val expected = List(
-      "1,5,true",
-      "7,1,true",
-      "9,1,true").sorted
-    assertEquals(expected, retracted)
-
-  }
-
-
-
-  @Test
-  def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env)
-      .assignAscendingTimestamps(_._1.toLong)
-      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
-      .groupBy('w, 'num)
-      .select('num, 'w.end as 'wend, 'id.count)
-      .writeToSink(new TestUpsertSink(Array("wend", "num"), true))
-
-    env.execute()
-    val results = RowCollector.getAndClearValues
-
-    assertFalse(
-      "Received retraction messages for append only table",
-      results.exists(!_.f0))
-
-    val retracted = upsertResults(results, Array(0, 1, 2)).sorted
-    val expected = List(
-      "1,1970-01-01 00:00:00.005,1",
-      "2,1970-01-01 00:00:00.005,2",
-      "3,1970-01-01 00:00:00.005,1",
-      "3,1970-01-01 00:00:00.01,2",
-      "4,1970-01-01 00:00:00.01,3",
-      "4,1970-01-01 00:00:00.015,1",
-      "5,1970-01-01 00:00:00.015,4",
-      "5,1970-01-01 00:00:00.02,1",
-      "6,1970-01-01 00:00:00.02,4",
-      "6,1970-01-01 00:00:00.025,2").sorted
-    assertEquals(expected, retracted)
-  }
-
-  @Test
-  def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env)
-      .assignAscendingTimestamps(_._1.toLong)
-      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
-      .groupBy('w, 'num)
-      .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count)
-      .writeToSink(new TestUpsertSink(Array("wstart", "wend", "num"), true))
-
-    env.execute()
-    val results = RowCollector.getAndClearValues
-
-    assertFalse(
-      "Received retraction messages for append only table",
-      results.exists(!_.f0))
-
-    val retracted = upsertResults(results, Array(0, 1, 2)).sorted
-    val expected = List(
-      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
-      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
-      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
-      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
-      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
-      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
-      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
-      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
-      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
-      "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
-    assertEquals(expected, retracted)
-  }
-
-  @Test
-  def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env)
-      .assignAscendingTimestamps(_._1.toLong)
-      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
-      .groupBy('w, 'num)
-      .select('w.end as 'wend, 'id.count as 'cnt)
-      .writeToSink(new TestUpsertSink(null, true))
-
-    env.execute()
-    val results = RowCollector.getAndClearValues
-
-    assertFalse(
-      "Received retraction messages for append only table",
-      results.exists(!_.f0))
-
-    val retracted = results.map(_.f1.toString).sorted
-    val expected = List(
-      "1970-01-01 00:00:00.005,1",
-      "1970-01-01 00:00:00.005,2",
-      "1970-01-01 00:00:00.005,1",
-      "1970-01-01 00:00:00.01,2",
-      "1970-01-01 00:00:00.01,3",
-      "1970-01-01 00:00:00.015,1",
-      "1970-01-01 00:00:00.015,4",
-      "1970-01-01 00:00:00.02,1",
-      "1970-01-01 00:00:00.02,4",
-      "1970-01-01 00:00:00.025,2").sorted
-    assertEquals(expected, retracted)
-  }
-
-  @Test
-  def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = StreamTestData.get3TupleDataStream(env)
-      .assignAscendingTimestamps(_._1.toLong)
-      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
-
-    t.window(Tumble over 5.millis on 'rowtime as 'w)
-      .groupBy('w, 'num)
-      .select('num, 'id.count as 'cnt)
-      .writeToSink(new TestUpsertSink(null, true))
-
-    env.execute()
-    val results = RowCollector.getAndClearValues
-
-    assertFalse(
-      "Received retraction messages for append only table",
-      results.exists(!_.f0))
-
-    val retracted = results.map(_.f1.toString).sorted
-    val expected = List(
-      "1,1",
-      "2,2",
-      "3,1",
-      "3,2",
-      "4,3",
-      "4,1",
-      "5,4",
-      "5,1",
-      "6,4",
-      "6,2").sorted
-    assertEquals(expected, retracted)
-  }
-
-  /** Converts a list of retraction messages into a list of final results. */
-  private def restractResults(results: List[JTuple2[JBool, Row]]): List[String] = {
-
-    val retracted = results
-      .foldLeft(Map[String, Int]()){ (m: Map[String, Int], v: JTuple2[JBool, Row]) =>
-        val cnt = m.getOrElse(v.f1.toString, 0)
-        if (v.f0) {
-          m + (v.f1.toString -> (cnt + 1))
-        } else {
-          m + (v.f1.toString -> (cnt - 1))
-        }
-      }.filter{ case (_, c: Int) => c != 0 }
-
-    assertFalse(
-      "Received retracted rows which have not been accumulated.",
-      retracted.exists{ case (_, c: Int) => c < 0})
-
-    retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList
-  }
-
-  /** Converts a list of upsert messages into a list of final results. */
-  private def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
-
-    def getKeys(r: Row): List[String] =
-      keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
-
-    val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
-      val key = getKeys(r.f1).mkString("")
-      if (r.f0) {
-        o + (key -> r.f1.toString)
-      } else {
-        o - key
-      }
-    }
-
-    upserted.values.toList
-  }
-
-}
-
-private[flink] class TestAppendSink extends AppendStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def emitDataStream(s: DataStream[Row]): Unit = {
-    s.map(
-      new MapFunction[Row, JTuple2[JBool, Row]] {
-        override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, value)
-      })
-      .addSink(new RowSink)
-  }
-
-  override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-    fieldNames: Array[String],
-    fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
-    val copy = new TestAppendSink
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-}
-
-private[flink] class TestRetractSink extends RetractStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = {
-    s.addSink(new RowSink)
-  }
-
-  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
-    val copy = new TestRetractSink
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-
-}
-
-private[flink] class TestUpsertSink(
-    expectedKeys: Array[String],
-    expectedIsAppendOnly: Boolean)
-  extends UpsertStreamTableSink[Row] {
-
-  var fNames: Array[String] = _
-  var fTypes: Array[TypeInformation[_]] = _
-
-  override def setKeyFields(keys: Array[String]): Unit =
-    if (keys != null) {
-      assertEquals("Provided key fields do not match expected keys",
-        expectedKeys.sorted.mkString(","),
-        keys.sorted.mkString(","))
-    } else {
-      assertNull("Provided key fields should not be null.", expectedKeys)
-    }
-
-  override def setIsAppendOnly(isAppendOnly: JBool): Unit =
-    assertEquals(
-      "Provided isAppendOnly does not match expected isAppendOnly",
-      expectedIsAppendOnly,
-      isAppendOnly)
-
-  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
-
-  override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = {
-    s.addSink(new RowSink)
-  }
-
-  override def getFieldNames: Array[String] = fNames
-
-  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
-
-  override def configure(
-      fieldNames: Array[String],
-      fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
-    val copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly)
-    copy.fNames = fieldNames
-    copy.fTypes = fieldTypes
-    copy
-  }
-}
-
-class RowSink extends SinkFunction[JTuple2[JBool, Row]] {
-  override def invoke(value: JTuple2[JBool, Row]): Unit = RowCollector.addValue(value)
-}
-
-object RowCollector {
-  private val sink: mutable.ArrayBuffer[JTuple2[JBool, Row]] =
-    new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
-
-  def addValue(value: JTuple2[JBool, Row]): Unit = {
-    sink.synchronized {
-      sink += value
-    }
-  }
-
-  def getAndClearValues: List[JTuple2[JBool, Row]] = {
-    val out = sink.toList
-    sink.clear()
-    out
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSourceITCase.scala
deleted file mode 100644
index 07b748c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/TableSourceITCase.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.runtime.datastream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.runtime.datastream.StreamITCase
-import org.apache.flink.table.utils.{CommonTestData, TestFilterableTableSource}
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class TableSourceITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testCsvTableSource(): Unit = {
-
-    val csvTable = CommonTestData.getCsvTableSource
-    StreamITCase.testResults = mutable.MutableList()
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    tEnv.registerTableSource("csvTable", csvTable)
-    tEnv.scan("csvTable")
-      .where('id > 4)
-      .select('last, 'score * 2)
-      .toAppendStream[Row]
-      .addSink(new StreamITCase.StringSink[Row])
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "Williams,69.0",
-      "Miller,13.56",
-      "Smith,180.2",
-      "Williams,4.68")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testCsvTableSourceWithFilterable(): Unit = {
-    StreamITCase.testResults = mutable.MutableList()
-    val tableName = "MyTable"
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    tEnv.registerTableSource(tableName, new TestFilterableTableSource)
-    tEnv.scan(tableName)
-      .where("amount > 4 && price < 9")
-      .select("id, name")
-      .addSink(new StreamITCase.StringSink[Row])
-
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}