You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:48 UTC

[14/15] flink git commit: [FLINK-6093] [table] Add stream TableSinks and DataStream conversion with support for retraction.

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
index 809afd2..7214394 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala
@@ -25,7 +25,6 @@ import org.apache.flink.types.Row
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 
 /**
   * A simple [[TableSink]] to emit data as CSV files.
@@ -40,7 +39,7 @@ class CsvTableSink(
     fieldDelim: Option[String],
     numFiles: Option[Int],
     writeMode: Option[WriteMode])
-  extends TableSinkBase[Row] with BatchTableSink[Row] with StreamTableSink[Row] {
+  extends TableSinkBase[Row] with BatchTableSink[Row] with AppendStreamTableSink[Row] {
 
   /**
     * A simple [[TableSink]] to emit data as CSV files.
@@ -134,100 +133,3 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] {
     builder.mkString
   }
 }
-
-/**
-  * A simple [[TableSink]] to emit data as CSV files.
-  *
-  * @param path The output path to write the Table to.
-  * @param fieldDelim The field delimiter
-  * @param numFiles The number of files to write to
-  * @param writeMode The write mode to specify whether existing files are overwritten or not.
-  */
-class CsvRetractTableSink(
-    path: String,
-    fieldDelim: Option[String],
-    numFiles: Option[Int],
-    writeMode: Option[WriteMode])
-  extends TableSinkBase[Row] with StreamRetractSink[Row] {
-
-  override def needsUpdatesAsRetraction: Boolean = true
-
-  /**
-    * A simple [[TableSink]] to emit data as CSV files.
-    *
-    * @param path The output path to write the Table to.
-    * @param fieldDelim The field delimiter, ',' by default.
-    */
-  def this(path: String, fieldDelim: String = ",") {
-    this(path, Some(fieldDelim), None, None)
-  }
-
-  /**
-    * A simple [[TableSink]] to emit data as CSV files.
-    *
-    * @param path The output path to write the Table to.
-    * @param fieldDelim The field delimiter.
-    * @param numFiles The number of files to write to.
-    * @param writeMode The write mode to specify whether existing files are overwritten or not.
-    */
-  def this(path: String, fieldDelim: String, numFiles: Int, writeMode: WriteMode) {
-    this(path, Some(fieldDelim), Some(numFiles), Some(writeMode))
-  }
-
-
-  override def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,Row]]): Unit = {
-    val csvRows = dataStream
-      .map(new CsvRetractFormatter(fieldDelim.getOrElse(",")))
-      .returns(TypeInformation.of(classOf[String]))
-
-
-    if (numFiles.isDefined) {
-      csvRows.setParallelism(numFiles.get)
-    }
-
-    val sink = writeMode match {
-      case None => csvRows.writeAsText(path)
-      case Some(wm) => csvRows.writeAsText(path, wm)
-    }
-
-    if (numFiles.isDefined) {
-      sink.setParallelism(numFiles.get)
-    }
-  }
-
-  override protected def copy: TableSinkBase[Row] = {
-    new CsvRetractTableSink(path, fieldDelim, numFiles, writeMode)
-  }
-
-  override def getOutputType: TypeInformation[Row] = {
-    new RowTypeInfo(getFieldTypes: _*)
-  }
-}
-
-/**
-  * Formats a [[Tuple2]] with change information into a [[String]] with fields separated by the
-  * field delimiter.
-  *
-  * @param fieldDelim The field delimiter.
-  */
-class CsvRetractFormatter(fieldDelim: String) extends MapFunction[JTuple2[Boolean,Row], String] {
-  override def map(rowT: JTuple2[Boolean,Row]): String = {
-
-    val row: Row = rowT.f1
-
-    val builder = new StringBuilder
-
-    builder.append(rowT.f0.toString)
-
-    // write following values
-    for (i <- 0 until row.getArity) {
-      builder.append(fieldDelim)
-      val v = row.getField(i)
-      if (v != null) {
-        builder.append(v.toString)
-      }
-    }
-    builder.mkString
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
new file mode 100644
index 0000000..3ab997e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.Types
+
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.Table
+
+/**
+  * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete
+  * changes.
+  *
+  * The table will be converted into a stream of accumulate and retraction messages which are
+  * encoded as [[JTuple2]].
+  * The first field is a [[JBool]] flag to indicate the message type.
+  * The second field holds the record of the requested type [[T]].
+  *
+  * A message with true [[JBool]] flag is an accumulate (or add) message.
+  * A message with false flag is a retract message.
+  *
+  * @tparam T Type of records that this [[TableSink]] expects and supports.
+  */
+trait RetractStreamTableSink[T] extends TableSink[JTuple2[JBool, T]] {
+
+  /** Returns the requested record type */
+  def getRecordType: TypeInformation[T]
+
+  /** Emits the DataStream. */
+  def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit
+
+  override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
deleted file mode 100644
index 7f7c944..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamRetractSink.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.streaming.api.datastream.DataStream
-
-trait StreamRetractSink[T] extends TableSink[T]{
-
-  /**
-    * Whether the [[StreamTableSink]] requires that update and delete changes are sent with
-    * retraction messages.
-    */
-  def needsUpdatesAsRetraction: Boolean = false
-
-  /** Emits the DataStream with change infomation. */
-  def emitDataStreamWithChange(dataStream: DataStream[JTuple2[Boolean,T]]): Unit
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
deleted file mode 100644
index 360252e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.Table
-
-/** Defines an external [[TableSink]] to emit a batch [[Table]].
-  *
-  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports.
-  */
-trait StreamTableSink[T] extends TableSink[T] {
-
-  /** Emits the DataStream. */
-  def emitDataStream(dataStream: DataStream[T]): Unit
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
new file mode 100644
index 0000000..2ae3406
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{Table, Types}
+
+/**
+  * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete
+  * changes. The [[Table]] must be have unique key fields (atomic or composite) or be append-only.
+  *
+  * If the [[Table]] does not have a unique key and is not append-only, a
+  * [[org.apache.flink.table.api.TableException]] will be thrown.
+  *
+  * The unique key of the table is configured by the [[UpsertStreamTableSink#setKeyFields()]]
+  * method.
+  *
+  * The [[Table]] will be converted into a stream of upsert and delete messages which are encoded as
+  * [[JTuple2]]. The first field is a [[JBool]] flag to indicate the message type. The second field
+  * holds the record of the requested type [[T]].
+  *
+  * A message with true [[JBool]] field is an upsert message for the configured key.
+  * A message with false flag is a delete message for the configured key.
+  *
+  * If the table is append-only, all messages will have a true flag and must be interpreted
+  * as insertions.
+  *
+  * @tparam T Type of records that this [[TableSink]] expects and supports.
+  */
+trait UpsertStreamTableSink[T] extends TableSink[JTuple2[JBool, T]] {
+
+  /**
+    * Configures the unique key fields of the [[Table]] to write.
+    * The method is called after [[TableSink.configure()]].
+    *
+    * The keys array might be empty, if the table consists of a single (updated) record.
+    * If the table does not have a key and is append-only, the keys attribute is null.
+    *
+    * @param keys the field names of the table's keys, an empty array if the table has a single
+    *             row, and null if the table is append-only and has no key.
+    */
+  def setKeyFields(keys: Array[String]): Unit
+
+  /**
+    * Specifies whether the [[Table]] to write is append-only or not.
+    *
+    * @param isAppendOnly true if the table is append-only, false otherwise.
+    */
+  def setIsAppendOnly(isAppendOnly: Boolean): Unit
+
+  /** Returns the requested record type */
+  def getRecordType: TypeInformation[T]
+
+  /** Emits the DataStream. */
+  def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit
+
+  override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index 675e5d9..ba3b591 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -21,7 +21,6 @@ package org.apache.flink.table
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor}
 import org.apache.flink.table.api.TableException

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
index d490763..40f4c7d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
@@ -27,10 +27,8 @@ import org.junit.Test
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.table.utils.TableFunc0
 
-import scala.collection.mutable
 
 /**
   * tests for retraction
@@ -55,51 +53,47 @@ class RetractionITCase extends StreamingWithStateTestBase {
   def testWordCount(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
+    StreamITCase.clear
     env.setStateBackend(getStateBackend)
 
     val stream = env.fromCollection(data)
     val table = stream.toTable(tEnv, 'word, 'num)
     val resultTable = table
       .groupBy('word)
-      .select('word as 'word, 'num.sum as 'count)
+      .select('num.sum as 'count)
       .groupBy('count)
-      .select('count, 'word.count as 'frequency)
+      .select('count, 'count.count as 'frequency)
 
-    val results = resultTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    val results = resultTable.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
     env.execute()
 
-    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0",
-      "4,1", "4,0", "5,1", "5,0", "6,1", "1,2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+    val expected = Seq("1,2", "2,1", "6,1")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
   // keyed groupby + non-keyed groupby
   @Test
   def testGroupByAndNonKeyedGroupBy(): Unit = {
-
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
+    StreamITCase.clear
     env.setStateBackend(getStateBackend)
 
     val stream = env.fromCollection(data)
     val table = stream.toTable(tEnv, 'word, 'num)
     val resultTable = table
       .groupBy('word)
-      .select('word as 'word, 'num.sum as 'count)
-      .select('count.sum)
+      .select('word as 'word, 'num.sum as 'cnt)
+      .select('cnt.sum)
+
+    val results = resultTable.toRetractStream[Row]
 
-    val results = resultTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
-    val expected = Seq("1", "2", "1", "3", "4", "3", "5", "3", "6", "3", "7", "3", "8", "3", "9",
-      "10")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+    val expected = Seq("10")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
   // non-keyed groupby + keyed groupby
@@ -108,8 +102,7 @@ class RetractionITCase extends StreamingWithStateTestBase {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
+    StreamITCase.clear
     env.setStateBackend(getStateBackend)
 
     val stream = env.fromCollection(data)
@@ -119,13 +112,12 @@ class RetractionITCase extends StreamingWithStateTestBase {
       .groupBy('count)
       .select('count, 'count.count)
 
-    val results = resultTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    val results = resultTable.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
     env.execute()
 
-    val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1", "4,0", "5,1", "5,0", "6," +
-      "1", "6,0", "7,1", "7,0", "8,1", "8,0", "9,1", "9,0", "10,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+    val expected = Seq("10,1")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
   // test unique process, if the current output message of unbounded groupby equals the
@@ -150,9 +142,9 @@ class RetractionITCase extends StreamingWithStateTestBase {
     )
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
+    StreamITCase.clear
     env.setStateBackend(getStateBackend)
+    env.setParallelism(1)
 
     val stream = env.fromCollection(data)
     val table = stream.toTable(tEnv, 'pk, 'value)
@@ -162,12 +154,13 @@ class RetractionITCase extends StreamingWithStateTestBase {
       .groupBy('sum)
       .select('sum, 'pk.count as 'count)
 
-    val results = resultTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    val results = resultTable.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractMessagesSink)
     env.execute()
 
-    val expected = Seq("1,1", "2,1", "3,1", "3,0", "6,1", "1,2", "1,3", "6,2", "6,1", "12,1","12," +
-      "0", "18,1", "8,1")
+    val expected = Seq(
+      "+1,1", "+2,1", "+3,1", "-3,1", "+6,1", "-1,1", "+1,2", "-1,2", "+1,3", "-6,1", "+6,2",
+      "-6,2", "+6,1", "+12,1", "-12,1", "+18,1", "+8,1")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -176,8 +169,7 @@ class RetractionITCase extends StreamingWithStateTestBase {
   def testCorrelate(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    env.setParallelism(1)
+    StreamITCase.clear
     env.setStateBackend(getStateBackend)
 
     val func0 = new TableFunc0
@@ -186,19 +178,17 @@ class RetractionITCase extends StreamingWithStateTestBase {
     val table = stream.toTable(tEnv, 'word, 'num)
     val resultTable = table
       .groupBy('word)
-      .select('word as 'word, 'num.sum as 'count)
+      .select('word as 'word, 'num.sum as 'cnt)
       .leftOuterJoin(func0('word))
-      .groupBy('count)
-      .select('count, 'word.count as 'frequency)
+      .groupBy('cnt)
+      .select('cnt, 'word.count as 'frequency)
 
-    val results = resultTable.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    val results = resultTable.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
     env.execute()
 
-    val expected = Seq(
-      "1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0", "4,1", "4,0", "5,1",
-      "5,0", "6,1", "1,2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+    val expected = Seq("1,2", "2,1", "6,1")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
index ceae6c6..c446d64 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
@@ -23,7 +23,7 @@ import java.io.File
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.StreamTestData
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.sinks.{CsvTableSink, CsvRetractTableSink}
+import org.apache.flink.table.sinks.CsvTableSink
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
@@ -59,34 +59,5 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase {
 
     TestBaseUtils.compareResultsByLinesInMemory(expected, path)
   }
-
-  @Test
-  def testStreamTableSinkNeedRetraction(): Unit = {
-
-    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
-    tmpFile.deleteOnExit()
-    val path = tmpFile.toURI.toString
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    env.setParallelism(4)
-
-    val input = StreamTestData.get3TupleDataStream(env)
-      .map(x => x).setParallelism(1) // increase DOP to 4
-
-    val results = input.toTable(tEnv, 'a, 'b, 'c)
-      .where('a < 5 || 'a > 17)
-      .select('c, 'b)
-      .groupBy('b)
-      .select('b, 'c.count)
-      .writeToSink(new CsvRetractTableSink(path))
-
-    env.execute()
-
-    val expected = Seq(
-      "true,1,1", "true,2,1", "false,2,1", "true,2,2", "true,3,1", "true,6,1", "false,6,1",
-      "true,6,2", "false,6,2", "true,6,3", "false,6,3", "true,6,4").mkString("\n")
-
-    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
-  }
+  
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index abbcbdd..249d505 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -53,19 +53,19 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
 
-    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
     tEnv.registerTable("MyTable", t)
 
-    val result = tEnv.sql(sqlQuery).toDataStream[Row]
-    result.addSink(new StreamITCase.StringSink)
+    val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+    result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList("1,1", "2,1", "2,2")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+    val expected = List("1,1", "2,2", "3,3", "4,4", "5,5", "6,6")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
   /** test selection **/
@@ -74,7 +74,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
 
@@ -85,7 +85,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList("2,0", "4,1", "6,1")
+    val expected = List("2,0", "4,1", "6,1")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -95,7 +95,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
 
@@ -106,7 +106,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList("3,2,Hello world")
+    val expected = List("3,2,Hello world")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -116,7 +116,7 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
 
@@ -127,7 +127,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList("3,2,Hello world")
+    val expected = List("3,2,Hello world")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -136,7 +136,7 @@ class SqlITCase extends StreamingWithStateTestBase {
   def testUnion(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val sqlQuery = "SELECT * FROM T1 " +
       "UNION ALL " +
@@ -151,7 +151,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "1,1,Hi", "1,1,Hi",
       "2,2,Hello", "2,2,Hello",
       "3,2,Hello world", "3,2,Hello world")
@@ -163,7 +163,7 @@ class SqlITCase extends StreamingWithStateTestBase {
   def testUnionWithFilter(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
       "UNION ALL " +
@@ -178,7 +178,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "2,2,Hello",
       "3,2,Hello world")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
@@ -189,7 +189,7 @@ class SqlITCase extends StreamingWithStateTestBase {
   def testUnionTableWithDataSet(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
       "UNION ALL " +
@@ -204,7 +204,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList("Hello", "Hello world")
+    val expected = List("Hello", "Hello world")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -213,7 +213,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     // for sum aggregation ensure that every time the order of each element is consistent
     env.setParallelism(1)
@@ -232,7 +232,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
       "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
@@ -243,7 +243,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
 
@@ -259,7 +259,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "Hello World,1", "Hello World,2", "Hello World,3",
       "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
@@ -270,7 +270,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     // for sum aggregation ensure that every time the order of each element is consistent
     env.setParallelism(1)
@@ -289,7 +289,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
       "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
@@ -300,7 +300,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
 
@@ -314,7 +314,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList("1", "2", "3", "4", "5", "6", "7", "8", "9")
+    val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -363,7 +363,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
       "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
       "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
@@ -420,7 +420,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
       "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
       "Hello,3,3,7",
@@ -490,7 +490,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
       "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
       "Hello,3,4,9",
@@ -562,7 +562,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
       "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
       "Hello,3,4,9",
@@ -584,7 +584,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
 
@@ -608,7 +608,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
@@ -661,7 +661,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "1,2,Hello,2,1,2,2,2",
       "1,3,Hello world,5,2,2,3,2",
       "1,1,Hi,6,3,2,3,1",
@@ -687,7 +687,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val sqlQuery = "SELECT a, b, c, " +
       "SUM(b) over (" +
@@ -731,7 +731,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "1,2,Hello,2,1,2,2,2",
       "1,3,Hello world,5,2,2,3,2",
       "1,1,Hi,6,3,2,3,1",
@@ -757,7 +757,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
@@ -793,7 +793,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "2,2,Hello,2,1,2,2,2",
       "3,5,Hello,7,2,3,5,2",
       "1,3,Hello,10,3,3,5,2",
@@ -812,7 +812,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
@@ -849,7 +849,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "2,2,Hello,2,1,2,2,2",
       "3,5,Hello,7,2,3,5,2",
       "1,3,Hello,10,3,3,5,2",
@@ -869,7 +869,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
@@ -907,7 +907,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "2,1,Hello,1,1,1,1,1",
       "1,1,Hello,7,4,1,3,1",
       "1,2,Hello,7,4,1,3,1",
@@ -932,7 +932,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setStateBackend(getStateBackend)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
@@ -975,7 +975,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "1,1,Hello,6,3,2,3,1",
       "1,2,Hello,6,3,2,3,1",
       "1,3,Hello world,6,3,2,3,1",
@@ -1000,7 +1000,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setParallelism(1)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val t = StreamTestData.get5TupleDataStream(env)
       .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
@@ -1017,7 +1017,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "1,0,0",
       "2,1,1",
       "2,3,1",
@@ -1043,7 +1043,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setParallelism(1)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val t = StreamTestData.get5TupleDataStream(env)
       .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
@@ -1060,7 +1060,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "1,0,0",
       "2,1,1",
       "2,3,1",
@@ -1087,7 +1087,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setParallelism(1)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val t = StreamTestData.get5TupleDataStream(env)
       .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
@@ -1104,7 +1104,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "1,0,0",
       "2,1,0",
       "2,3,0",
@@ -1130,7 +1130,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setStateBackend(getStateBackend)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     env.setParallelism(1)
-    StreamITCase.testResults = mutable.MutableList()
+    StreamITCase.clear
 
     val t = StreamTestData.get5TupleDataStream(env)
       .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
@@ -1146,7 +1146,7 @@ class SqlITCase extends StreamingWithStateTestBase {
     result.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = mutable.MutableList(
+    val expected = List(
       "1,0,0",
       "2,1,0",
       "2,3,0",

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
index 271e90b..910cbf2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase.RetractingSink
 import org.apache.flink.types.Row
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -38,29 +39,24 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
   def testNonKeyedGroupAggregate(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
 
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
             .select('a.sum, 'b.sum)
 
-    val results = t.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    val results = t.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
     env.execute()
 
-    val expected = mutable.MutableList(
-      "1,1", "3,3", "6,5", "10,8", "15,11", "21,14", "28,18", "36,22", "45,26", "55,30", "66,35",
-      "78,40", "91,45", "105,50", "120,55", "136,61", "153,67", "171,73", "190,79", "210,85",
-      "231,91")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+    val expected = List("231,91")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
   @Test
   def testGroupAggregate(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
 
@@ -68,15 +64,12 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
       .groupBy('b)
       .select('b, 'a.sum)
 
-    val results = t.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    val results = t.toRetractStream[Row]
+    results.addSink(new StreamITCase.RetractingSink)
     env.execute()
 
-    val expected = mutable.MutableList(
-      "1,1", "2,2", "2,5", "3,4", "3,9", "3,15", "4,7", "4,15",
-      "4,24", "4,34", "5,11", "5,23", "5,36", "5,50", "5,65", "6,16", "6,33", "6,51", "6,70",
-      "6,90", "6,111")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+    val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
   @Test
@@ -88,30 +81,22 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
 
     val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
       .groupBy('b)
-      .select('a.sum as 'd, 'b)
-      .groupBy('b, 'd)
-      .select('b)
+      .select('a.count as 'cnt, 'b)
+      .groupBy('cnt)
+      .select('cnt, 'b.count as 'freq)
 
-    val results = t.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
+    val results = t.toRetractStream[Row]
 
-    val expected = mutable.MutableList(
-      "1",
-      "2", "2",
-      "3", "3", "3",
-      "4", "4", "4", "4",
-      "5", "5", "5", "5", "5",
-      "6", "6", "6", "6", "6", "6")
-
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+    results.addSink(new RetractingSink)
+    env.execute()
+    val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
   @Test
   def testGroupAggregateWithExpression(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStateBackend(getStateBackend)
-    env.setParallelism(1)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.clear
 
@@ -119,14 +104,14 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase {
       .groupBy('e, 'b % 3)
       .select('c.min, 'e, 'a.avg, 'd.count)
 
-    val results = t.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
+    val results = t.toRetractStream[Row]
+    results.addSink(new RetractingSink)
     env.execute()
 
     val expected = mutable.MutableList(
-      "0,1,1,1", "1,2,2,1", "2,1,2,1", "3,2,3,1", "1,2,2,2",
-      "5,3,3,1", "3,2,3,2", "7,1,4,1", "2,1,3,2", "3,2,3,3", "7,1,4,2", "5,3,4,2", "12,3,5,1",
-      "1,2,3,3", "14,2,5,1")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+      "0,1,1,1", "7,1,4,2", "2,1,3,2",
+      "3,2,3,3", "1,2,3,3", "14,2,5,1",
+      "12,3,5,1", "5,3,4,2")
+    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
index ea3ab22..96e5eb5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -35,7 +35,7 @@ class OverWindowTest extends TableTestBase {
     val result = table
       .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
       .select('c, 'b.count over 'x)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -43,7 +43,7 @@ class OverWindowTest extends TableTestBase {
     val result = table
       .window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w)
       .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -51,7 +51,7 @@ class OverWindowTest extends TableTestBase {
     val result = table
       .window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" as 'w)
       .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -59,7 +59,7 @@ class OverWindowTest extends TableTestBase {
     val result = table
       .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_RANGE as 'w)
       .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -67,7 +67,7 @@ class OverWindowTest extends TableTestBase {
     val result = table
       .window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w)
       .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -77,7 +77,7 @@ class OverWindowTest extends TableTestBase {
     val result = table2
       .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
       .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -85,7 +85,7 @@ class OverWindowTest extends TableTestBase {
     val result = table
       .window(Over orderBy 'rowtime preceding -1.rows as 'w)
       .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -93,7 +93,7 @@ class OverWindowTest extends TableTestBase {
     val result = table
       .window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w)
       .select('c, 'b.count over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test(expected = classOf[ValidationException])
@@ -103,7 +103,7 @@ class OverWindowTest extends TableTestBase {
     val result = table
       .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
       .select('c, weightedAvg('b, 'a) over 'w)
-    streamUtil.tEnv.optimize(result.getRelNode)
+    streamUtil.tEnv.optimize(result.getRelNode, updatesAsRetraction = true)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
index 497869d..effde8e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -25,16 +25,18 @@ import org.junit.Assert._
 
 import scala.collection.mutable
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.runtime.types.CRow
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 object StreamITCase {
 
-  var testResults = mutable.MutableList.empty[String]
+  var testResults: mutable.MutableList[String] = mutable.MutableList.empty[String]
+  var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]
 
   def clear = {
     StreamITCase.testResults.clear()
+    StreamITCase.retractedResults.clear()
   }
 
   def compareWithList(expected: java.util.List[String]): Unit = {
@@ -49,4 +51,33 @@ object StreamITCase {
       }
     }
   }
+
+  final class RetractMessagesSink extends RichSinkFunction[(Boolean, Row)]() {
+    def invoke(v: (Boolean, Row)) {
+      testResults.synchronized {
+        testResults += (if (v._1) "+" else "-") + v._2
+      }
+    }
+  }
+
+  final class RetractingSink() extends RichSinkFunction[(Boolean, Row)] {
+    def invoke(v: (Boolean, Row)) {
+      retractedResults.synchronized {
+        val value = v._2.toString
+        if (v._1) {
+          retractedResults += value
+        } else {
+          val idx = retractedResults.indexOf(value)
+          if (idx >= 0) {
+            retractedResults.remove(idx)
+          } else {
+            throw new RuntimeException("Tried to retract a value that wasn't added first. " +
+              "This is probably an incorrectly implemented test. " +
+              "Try to set the parallelism of the sink to 1.")
+          }
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
index 580029f..861f70e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/rules/RetractionRulesTest.scala
@@ -292,7 +292,7 @@ class StreamTableTestForRetractionUtil extends StreamTableTestUtil {
 
   def verifyTableTrait(resultTable: Table, expected: String): Unit = {
     val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
+    val optimized = tEnv.optimize(relNode, updatesAsRetraction = false)
     val actual = TraitUtil.toString(optimized)
     assertEquals(
       expected.split("\n").map(_.trim).mkString("\n"),

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
new file mode 100644
index 0000000..2dfb658
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sinks/StreamTableSinksITCase.scala
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class StreamTableSinksITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[TableException])
+  def testAppendSinkOnUpdatingTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
+
+    t.groupBy('text)
+      .select('text, 'id.count, 'num.sum)
+      .writeToSink(new TestAppendSink)
+
+    // must fail because table is not append-only
+    env.execute()
+  }
+
+  @Test
+  def testAppendSinkOnAppendTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+        .assignAscendingTimestamps(_._1.toLong)
+        .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w)
+      .select('w.end, 'id.count, 'num.sum)
+      .writeToSink(new TestAppendSink)
+
+    env.execute()
+
+    val result = RowCollector.getAndClearValues.map(_.f1.toString).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,4,8",
+      "1970-01-01 00:00:00.01,5,18",
+      "1970-01-01 00:00:00.015,5,24",
+      "1970-01-01 00:00:00.02,5,29",
+      "1970-01-01 00:00:00.025,2,12")
+      .sorted
+    assertEquals(expected, result)
+  }
+
+  @Test
+  def testRetractSinkOnUpdatingTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text)
+
+    t.select('id, 'num, 'text.charLength() as 'len)
+      .groupBy('len)
+      .select('len, 'id.count, 'num.sum)
+      .writeToSink(new TestRetractSink)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    val retracted = restractResults(results).sorted
+    val expected = List(
+      "2,1,1",
+      "5,1,2",
+      "11,1,2",
+      "25,1,3",
+      "10,7,39",
+      "14,1,3",
+      "9,9,41").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testRetractSinkOnAppendTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w)
+      .select('w.end, 'id.count, 'num.sum)
+      .writeToSink(new TestRetractSink)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = restractResults(results).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,4,8",
+      "1970-01-01 00:00:00.01,5,18",
+      "1970-01-01 00:00:00.015,5,24",
+      "1970-01-01 00:00:00.02,5,29",
+      "1970-01-01 00:00:00.025,2,12")
+      .sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text)
+
+    t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+      .groupBy('len, 'cTrue)
+      .select('len, 'id.count as 'cnt, 'cTrue)
+      .groupBy('cnt, 'cTrue)
+      .select('cnt, 'len.count, 'cTrue)
+      .writeToSink(new TestUpsertSink(Array("cnt", "cTrue"), false))
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertTrue(
+      "Results must include delete messages",
+      results.exists(_.f0 == false)
+    )
+
+    val retracted = upsertResults(results, Array(0, 2)).sorted
+    val expected = List(
+      "1,5,true",
+      "7,1,true",
+      "9,1,true").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test(expected = classOf[TableException])
+  def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text)
+
+    t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+      .groupBy('len, 'cTrue)
+      .select('len, 'id.count, 'num.sum)
+      .writeToSink(new TestUpsertSink(Array("len", "cTrue"), false))
+
+    // must fail because table is updating table without full key
+    env.execute()
+  }
+
+  @Test
+  def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'num)
+      .select('num, 'w.end as 'wend, 'id.count)
+      .writeToSink(new TestUpsertSink(Array("wend", "num"), true))
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = upsertResults(results, Array(0, 1, 2)).sorted
+    val expected = List(
+      "1,1970-01-01 00:00:00.005,1",
+      "2,1970-01-01 00:00:00.005,2",
+      "3,1970-01-01 00:00:00.005,1",
+      "3,1970-01-01 00:00:00.01,2",
+      "4,1970-01-01 00:00:00.01,3",
+      "4,1970-01-01 00:00:00.015,1",
+      "5,1970-01-01 00:00:00.015,4",
+      "5,1970-01-01 00:00:00.02,1",
+      "6,1970-01-01 00:00:00.02,4",
+      "6,1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'num)
+      .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count)
+      .writeToSink(new TestUpsertSink(Array("wstart", "wend", "num"), true))
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = upsertResults(results, Array(0, 1, 2)).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
+      "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'num)
+      .select('w.end as 'wend, 'id.count as 'cnt)
+      .writeToSink(new TestUpsertSink(null, true))
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = results.map(_.f1.toString).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.005,2",
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.01,2",
+      "1970-01-01 00:00:00.01,3",
+      "1970-01-01 00:00:00.015,1",
+      "1970-01-01 00:00:00.015,4",
+      "1970-01-01 00:00:00.02,1",
+      "1970-01-01 00:00:00.02,4",
+      "1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'num)
+      .select('num, 'id.count as 'cnt)
+      .writeToSink(new TestUpsertSink(null, true))
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = results.map(_.f1.toString).sorted
+    val expected = List(
+      "1,1",
+      "2,2",
+      "3,1",
+      "3,2",
+      "4,3",
+      "4,1",
+      "5,4",
+      "5,1",
+      "6,4",
+      "6,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  /** Converts a list of retraction messages into a list of final results. */
+  private def restractResults(results: List[JTuple2[JBool, Row]]): List[String] = {
+
+    val retracted = results
+      .foldLeft(Map[String, Int]()){ (m: Map[String, Int], v: JTuple2[JBool, Row]) =>
+        val cnt = m.getOrElse(v.f1.toString, 0)
+        if (v.f0) {
+          m + (v.f1.toString -> (cnt + 1))
+        } else {
+          m + (v.f1.toString -> (cnt - 1))
+        }
+      }.filter{ case (_, c: Int) => c != 0 }
+
+    assertFalse(
+      "Received retracted rows which have not been accumulated.",
+      retracted.exists{ case (_, c: Int) => c < 0})
+
+    retracted.flatMap { case (r: String, c: Int) => (0 until c).map(_ => r) }.toList
+  }
+
+  /** Converts a list of upsert messages into a list of final results. */
+  private def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
+
+    def getKeys(r: Row): List[String] =
+      keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
+
+    val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
+      val key = getKeys(r.f1).mkString("")
+      if (r.f0) {
+        o + (key -> r.f1.toString)
+      } else {
+        o - key
+      }
+    }
+
+    upserted.values.toList
+  }
+
+}
+
+private class TestAppendSink extends AppendStreamTableSink[Row] {
+
+  var fNames: Array[String] = _
+  var fTypes: Array[TypeInformation[_]] = _
+
+  override def emitDataStream(s: DataStream[Row]): Unit = {
+    s.map(
+      new MapFunction[Row, JTuple2[JBool, Row]] {
+        override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, value)
+      })
+      .addSink(new RowSink)
+  }
+
+  override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
+
+  override def getFieldNames: Array[String] = fNames
+
+  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
+
+  override def configure(
+    fieldNames: Array[String],
+    fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
+    val copy = new TestAppendSink
+    copy.fNames = fieldNames
+    copy.fTypes = fieldTypes
+    copy
+  }
+}
+
+private class TestRetractSink extends RetractStreamTableSink[Row] {
+
+  var fNames: Array[String] = _
+  var fTypes: Array[TypeInformation[_]] = _
+
+  override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = {
+    s.addSink(new RowSink)
+  }
+
+  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
+
+  override def getFieldNames: Array[String] = fNames
+
+  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
+
+  override def configure(
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
+    val copy = new TestRetractSink
+    copy.fNames = fieldNames
+    copy.fTypes = fieldTypes
+    copy
+  }
+
+}
+
+private class TestUpsertSink(
+    expectedKeys: Array[String],
+    expectedIsAppendOnly: Boolean)
+  extends UpsertStreamTableSink[Row] {
+
+  var fNames: Array[String] = _
+  var fTypes: Array[TypeInformation[_]] = _
+
+  override def setKeyFields(keys: Array[String]): Unit =
+    if (keys != null) {
+      assertEquals("Provided key fields do not match expected keys",
+        expectedKeys.sorted.mkString(","),
+        keys.sorted.mkString(","))
+    } else {
+      assertNull("Provided key fields should not be null.", expectedKeys)
+    }
+
+  override def setIsAppendOnly(isAppendOnly: Boolean): Unit =
+    assertEquals(
+      "Provided isAppendOnly does not match expected isAppendOnly",
+      expectedIsAppendOnly,
+      isAppendOnly)
+
+  override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames)
+
+  override def emitDataStream(s: DataStream[JTuple2[JBool, Row]]): Unit = {
+    s.addSink(new RowSink)
+  }
+
+  override def getFieldNames: Array[String] = fNames
+
+  override def getFieldTypes: Array[TypeInformation[_]] = fTypes
+
+  override def configure(
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): TableSink[JTuple2[JBool, Row]] = {
+    val copy = new TestUpsertSink(expectedKeys, expectedIsAppendOnly)
+    copy.fNames = fieldNames
+    copy.fTypes = fieldTypes
+    copy
+  }
+}
+
+class RowSink extends SinkFunction[JTuple2[JBool, Row]] {
+  override def invoke(value: JTuple2[JBool, Row]): Unit = RowCollector.addValue(value)
+}
+
+object RowCollector {
+  private val sink: mutable.ArrayBuffer[JTuple2[JBool, Row]] =
+    new mutable.ArrayBuffer[JTuple2[JBool, Row]]()
+
+  def addValue(value: JTuple2[JBool, Row]): Unit = {
+    sink.synchronized {
+      sink += value
+    }
+  }
+
+  def getAndClearValues: List[JTuple2[JBool, Row]] = {
+    val out = sink.toList
+    sink.clear()
+    out
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index 79e957a..8626b07 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -39,9 +39,4 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) {
 
   override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ???
 
-  override protected def getConversionMapper[IN, OUT](
-      physicalTypeInfo: TypeInformation[IN],
-      logicalRowType: RelDataType,
-      requestedTypeInfo: TypeInformation[OUT],
-      functionName: String) = ???
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f37988c1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 73bc2f8..0e6d461 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -211,7 +211,7 @@ case class StreamTableTestUtil() extends TableTestUtil {
 
   def verifyTable(resultTable: Table, expected: String): Unit = {
     val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
+    val optimized = tEnv.optimize(relNode, updatesAsRetraction = false)
     val actual = RelOptUtil.toString(optimized)
     assertEquals(
       expected.split("\n").map(_.trim).mkString("\n"),
@@ -221,7 +221,7 @@ case class StreamTableTestUtil() extends TableTestUtil {
   // the print methods are for debugging purposes only
   def printTable(resultTable: Table): Unit = {
     val relNode = resultTable.getRelNode
-    val optimized = tEnv.optimize(relNode)
+    val optimized = tEnv.optimize(relNode, updatesAsRetraction = false)
     println(RelOptUtil.toString(optimized))
   }