You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/26 05:49:48 UTC

[flink] branch master updated: [FLINK-13289][table-planner-blink] Blink planner should setKeyFields to upsert table sink

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 35237ae  [FLINK-13289][table-planner-blink] Blink planner should setKeyFields to upsert table sink
35237ae is described below

commit 35237ae8c42f2184717d8f5eee1cf8efed2c63c7
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Mon Jul 22 16:04:34 2019 +0800

    [FLINK-13289][table-planner-blink] Blink planner should setKeyFields to upsert table sink
    
    This closes #9195
---
 .../nodes/physical/stream/StreamExecSink.scala     |  20 +
 .../planner/plan/utils/UpdatingPlanChecker.scala   | 202 +-------
 .../runtime/stream/sql/Limit0RemoveITCase.scala    |   8 +-
 .../planner/runtime/stream/sql/RankITCase.scala    |   4 +-
 .../planner/runtime/stream/table/JoinITCase.scala  |   4 +-
 .../runtime/stream/table/TableSinkITCase.scala     | 524 +++++++++++++++++++++
 .../planner/runtime/utils/StreamTestSink.scala     |  21 +-
 7 files changed, 583 insertions(+), 200 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
index e31264a..22a8ede 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSink.scala
@@ -97,6 +97,26 @@ class StreamExecSink[T](
             // check for append only table
             val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this)
             upsertSink.setIsAppendOnly(isAppendOnlyTable)
+
+            // extract unique key fields
+            // Now we pick shortest one to sink
+            // TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]]
+            val tableKeys = {
+              UpdatingPlanChecker.getUniqueKeyFields(getInput, planner) match {
+                case Some(keys) => keys.sortBy(_.length).headOption
+                case None => None
+              }
+            }
+
+            // check that we have keys if the table has changes (is not append-only)
+            tableKeys match {
+              case Some(keys) => upsertSink.setKeyFields(keys)
+              case None if isAppendOnlyTable => upsertSink.setKeyFields(null)
+              case None if !isAppendOnlyTable => throw new TableException(
+                "UpsertStreamTableSink requires that Table has" +
+                    " a full primary keys if it is updated.")
+            }
+
             translateToTransformation(withChangeFlag = true, planner)
 
           case _: AppendStreamTableSink[T] =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
index e412e12..b0aaf9e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/UpdatingPlanChecker.scala
@@ -17,16 +17,15 @@
  */
 package org.apache.flink.table.planner.plan.utils
 
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
 
 import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.rel.{RelNode, RelVisitor}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
-import org.apache.calcite.sql.SqlKind
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable
 
 object UpdatingPlanChecker {
 
@@ -39,14 +38,15 @@ object UpdatingPlanChecker {
   }
 
   /** Extracts the unique keys of the table produced by the plan. */
-  def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
-    getUniqueKeyGroups(plan).map(_.map(_._1).toArray)
-  }
-
-  /** Extracts the unique keys and groups of the table produced by the plan. */
-  def getUniqueKeyGroups(plan: RelNode): Option[Seq[(String, String)]] = {
-    val keyExtractor = new UniqueKeyExtractor
-    keyExtractor.visit(plan)
+  def getUniqueKeyFields(relNode: RelNode, planner: StreamPlanner): Option[Array[Array[String]]] = {
+    val rowType = relNode.getRowType
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(planner.getRelBuilder.getCluster.getMetadataQuery)
+    val uniqueKeys = fmq.getUniqueKeys(relNode)
+    if (uniqueKeys != null && uniqueKeys.size() > 0) {
+      Some(uniqueKeys.filter(_.nonEmpty).map(_.toArray.map(rowType.getFieldNames.get)).toArray)
+    } else {
+      None
+    }
   }
 
   private class AppendOnlyValidator extends RelVisitor {
@@ -55,7 +55,7 @@ object UpdatingPlanChecker {
 
     override def visit(node: RelNode, ordinal: Int, parent: RelNode): Unit = {
       node match {
-        case s: StreamPhysicalRel if s.producesUpdates =>
+        case s: StreamPhysicalRel if s.producesUpdates || s.producesRetractions =>
           isAppendOnly = false
         case hep: HepRelVertex =>
           visit(hep.getCurrentRel, ordinal, parent)   //remove wrapper node
@@ -66,182 +66,4 @@ object UpdatingPlanChecker {
       }
     }
   }
-
-  /** Identifies unique key fields in the output of a RelNode. */
-  private class UniqueKeyExtractor {
-
-    // visit() function will return a tuple, the first element is the name of a key field, the
-    // second is a group name that is shared by all equivalent key fields. The group names are
-    // used to identify same keys, for example: select('pk as pk1, 'pk as pk2), both pk1 and pk2
-    // belong to the same group, i.e., pk1. Here we use the lexicographic smallest attribute as
-    // the common group id. A node can have keys if it generates the keys by itself or it
-    // forwards keys from its input(s).
-    def visit(node: RelNode): Option[Seq[(String, String)]] = {
-      node match {
-        case c: StreamExecCalc =>
-          val inputKeys = visit(node.getInput(0))
-          // check if input has keys
-          if (inputKeys.isDefined) {
-            // track keys forward
-            val inNames = c.getInput.getRowType.getFieldNames
-            val inOutNames = c.getProgram.getNamedProjects.map(p => {
-              c.getProgram.expandLocalRef(p.left) match {
-                // output field is forwarded input field
-                case i: RexInputRef => (i.getIndex, p.right)
-                // output field is renamed input field
-                case a: RexCall if a.getKind.equals(SqlKind.AS) =>
-                  a.getOperands.get(0) match {
-                    case ref: RexInputRef =>
-                      (ref.getIndex, p.right)
-                    case _ =>
-                      (-1, p.right)
-                  }
-                // output field is not forwarded from input
-                case _: RexNode => (-1, p.right)
-              }
-            })
-              // filter all non-forwarded fields
-              .filter(_._1 >= 0)
-              // resolve names of input fields
-              .map(io => (inNames.get(io._1), io._2))
-
-            // filter by input keys
-            val inputKeysAndOutput = inOutNames
-              .filter(io => inputKeys.get.map(e => e._1).contains(io._1))
-
-            val inputKeysMap = inputKeys.get.toMap
-            val inOutGroups = inputKeysAndOutput.sorted.reverse
-              .map(e => (inputKeysMap(e._1), e._2))
-              .toMap
-
-            // get output keys
-            val outputKeys = inputKeysAndOutput
-              .map(io => (io._2, inOutGroups(inputKeysMap(io._1))))
-
-            // check if all keys have been preserved
-            if (outputKeys.map(_._2).distinct.length == inputKeys.get.map(_._2).distinct.length) {
-              // all key have been preserved (but possibly renamed)
-              Some(outputKeys)
-            } else {
-              // some (or all) keys have been removed. Keys are no longer unique and removed
-              None
-            }
-          } else {
-            None
-          }
-
-        case _: StreamExecOverAggregate =>
-          // keys are always forwarded by Over aggregate
-          visit(node.getInput(0))
-        case a: StreamExecGroupAggregate =>
-          // get grouping keys
-          val groupKeys = a.getRowType.getFieldNames.take(a.grouping.length)
-          Some(groupKeys.map(e => (e, e)))
-
-        // TODO supports StreamExecGroupWindowAggregate
-
-        case j: StreamExecJoin =>
-          // get key(s) for join
-          val lInKeys = visit(j.getLeft)
-          val rInKeys = visit(j.getRight)
-          if (lInKeys.isEmpty || rInKeys.isEmpty) {
-            None
-          } else {
-            // Output of join must have keys if left and right both contain key(s).
-            // Key groups from both side will be merged by join equi-predicates
-            val lInNames: Seq[String] = j.getLeft.getRowType.getFieldNames
-            val rInNames: Seq[String] = j.getRight.getRowType.getFieldNames
-            val joinNames = j.getRowType.getFieldNames
-
-            // if right field names equal to left field names, calcite will rename right
-            // field names. For example, T1(pk, a) join T2(pk, b), calcite will rename T2(pk, b)
-            // to T2(pk0, b).
-            val rInNamesToJoinNamesMap = rInNames
-              .zip(joinNames.subList(lInNames.size, joinNames.length))
-              .toMap
-
-            val lJoinKeys: Seq[String] = j.getJoinInfo.leftKeys
-              .map(lInNames.get(_))
-            val rJoinKeys: Seq[String] = j.getJoinInfo.rightKeys
-              .map(rInNames.get(_))
-              .map(rInNamesToJoinNamesMap(_))
-
-            val inKeys: Seq[(String, String)] = lInKeys.get ++ rInKeys.get
-              .map(e => (rInNamesToJoinNamesMap(e._1), rInNamesToJoinNamesMap(e._2)))
-
-            getOutputKeysForNonWindowJoin(
-              joinNames,
-              inKeys,
-              lJoinKeys.zip(rJoinKeys)
-            )
-          }
-        case _: StreamPhysicalRel =>
-          // anything else does not forward keys, so we can stop
-          None
-      }
-    }
-
-    /**
-      * Get output keys for non-window join according to it's inputs.
-      *
-      * @param inNames  Field names of join
-      * @param inKeys   Input keys of join
-      * @param joinKeys JoinKeys of join
-      * @return Return output keys of join
-      */
-    def getOutputKeysForNonWindowJoin(
-        inNames: Seq[String],
-        inKeys: Seq[(String, String)],
-        joinKeys: Seq[(String, String)])
-    : Option[Seq[(String, String)]] = {
-
-      val nameToGroups = mutable.HashMap.empty[String, String]
-
-      // merge two groups
-      def merge(nameA: String, nameB: String): Unit = {
-        val ga: String = findGroup(nameA)
-        val gb: String = findGroup(nameB)
-        if (!ga.equals(gb)) {
-          if (ga.compare(gb) < 0) {
-            nameToGroups += (gb -> ga)
-          } else {
-            nameToGroups += (ga -> gb)
-          }
-        }
-      }
-
-      def findGroup(x: String): String = {
-        // find the group of x
-        var r: String = x
-        while (!nameToGroups(r).equals(r)) {
-          r = nameToGroups(r)
-        }
-
-        // point all name to the group name directly
-        var a: String = x
-        var b: String = null
-        while (!nameToGroups(a).equals(r)) {
-          b = nameToGroups(a)
-          nameToGroups += (a -> r)
-          a = b
-        }
-        r
-      }
-
-      // init groups
-      inNames.foreach(e => nameToGroups += (e -> e))
-      inKeys.foreach(e => nameToGroups += (e._1 -> e._2))
-      // merge groups
-      joinKeys.foreach(e => merge(e._1, e._2))
-      // make sure all name point to the group name directly
-      inNames.foreach(findGroup)
-
-      val outputGroups = inKeys.map(e => nameToGroups(e._1)).distinct
-      Some(
-        inNames
-          .filter(e => outputGroups.contains(nameToGroups(e)))
-          .map(e => (e, nameToGroups(e)))
-      )
-    }
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/Limit0RemoveITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/Limit0RemoveITCase.scala
index 3b1763e..824907d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/Limit0RemoveITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/Limit0RemoveITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSinkUtil, TestingAppendTableSink, TestingUpsertTableSink}
+import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSinkUtil, TestingAppendTableSink, TestingRetractTableSink}
 
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -134,7 +134,7 @@ class Limit0RemoveITCase extends StreamingTestBase() {
     val sql = "SELECT * FROM MyTable1 WHERE EXISTS (SELECT a FROM MyTable2 LIMIT 0)"
 
     val result = tEnv.sqlQuery(sql)
-    val sink = TestSinkUtil.configureSink(result, new TestingUpsertTableSink(Array(0)))
+    val sink = TestSinkUtil.configureSink(result, new TestingRetractTableSink())
     tEnv.registerTableSink("MySink", sink)
     tEnv.insertInto(result, "MySink")
     tEnv.execute("test")
@@ -155,13 +155,13 @@ class Limit0RemoveITCase extends StreamingTestBase() {
     val sql = "SELECT * FROM MyTable1 WHERE NOT EXISTS (SELECT a FROM MyTable2 LIMIT 0)"
 
     val result = tEnv.sqlQuery(sql)
-    val sink = TestSinkUtil.configureSink(result, new TestingUpsertTableSink(Array(0)))
+    val sink = TestSinkUtil.configureSink(result, new TestingRetractTableSink())
     tEnv.registerTableSink("MySink", sink)
     tEnv.insertInto(result, "MySink")
     tEnv.execute("test")
 
     val expected = Seq("1", "2", "3", "4", "5", "6")
-    assertEquals(expected, sink.getUpsertResults.sorted)
+    assertEquals(expected, sink.getRetractResults.sorted)
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
index b8677f92..ceeeed3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/RankITCase.scala
@@ -1069,7 +1069,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
 
     val table = tEnv.sqlQuery(sql)
     val schema = table.getSchema
-    val sink = new TestingUpsertTableSink(Array(0, 2)).
+    val sink = new TestingRetractTableSink().
       configure(schema.getFieldNames,
         schema.getFieldDataTypes.map(_.nullable()).map(fromDataTypeToTypeInfo))
     tEnv.registerTableSink("MySink", sink)
@@ -1092,7 +1092,7 @@ class RankITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
       "book,20,5",
       "fruit,40,1",
       "fruit,44,3")
-    assertEquals(updatedExpected.sorted, sink.getUpsertResults.sorted)
+    assertEquals(updatedExpected.sorted, sink.getRetractResults.sorted)
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
index 54b7f9e..306b559 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/JoinITCase.scala
@@ -871,14 +871,14 @@ class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
       .where('leftPk === 'rightPk)
       .select('leftPk, 'leftA, 'rightPk, 'rightA)
     val schema = t.getSchema
-    val sink = new TestingUpsertTableSink(Array(0, 1, 2)).configure(
+    val sink = new TestingRetractTableSink().configure(
       schema.getFieldNames, schema.getFieldTypes)
     tEnv.registerTableSink("MySink", sink)
     tEnv.insertInto(t, "MySink")
     tEnv.execute("test")
 
     val expected = Seq("1,4,1,2", "1,5,1,2")
-    assertEquals(expected.sorted, sink.getUpsertResults.sorted)
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
new file mode 100644
index 0000000..aa6b17d
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, Tumble, Types}
+import org.apache.flink.table.planner.runtime.utils.TestData.{smallTupleData3, tupleData3, tupleData5}
+import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink}
+import org.apache.flink.table.planner.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.table.sinks._
+import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit.Test
+
+import java.io.File
+import java.util.TimeZone
+
+import scala.collection.JavaConverters._
+
+class TableSinkITCase extends AbstractTestBase {
+
+  @Test
+  def testInsertIntoRegisteredTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val tEnv = StreamTableEnvironment.create(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val input = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(r => r._2)
+    val fieldNames = Array("d", "e", "t")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", sink.configure(fieldNames, fieldTypes))
+
+    input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+      .where('a < 3 || 'a > 19)
+      .select('c, 't, 'b)
+      .insertInto("targetTable")
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1970-01-01 00:00:00.001,1",
+      "Hello,1970-01-01 00:00:00.002,2",
+      "Comment#14,1970-01-01 00:00:00.006,6",
+      "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
+  }
+
+  @Test
+  def testStreamTableSink(): Unit = {
+
+    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+    tmpFile.deleteOnExit()
+    val path = tmpFile.toURI.toString
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val tEnv = StreamTableEnvironment.create(env)
+    env.setParallelism(4)
+
+    tEnv.registerTableSink(
+      "csvSink",
+      new CsvTableSink(path).configure(
+        Array[String]("c", "b"),
+        Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP)))
+
+    val input = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._2)
+      .map(x => x).setParallelism(4) // increase DOP to 4
+
+    input.toTable(tEnv, 'a, 'b.rowtime, 'c)
+      .where('a < 5 || 'a > 17)
+      .select('c, 'b)
+      .insertInto("csvSink")
+
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1970-01-01 00:00:00.001",
+      "Hello,1970-01-01 00:00:00.002",
+      "Hello world,1970-01-01 00:00:00.002",
+      "Hello world, how are you?,1970-01-01 00:00:00.003",
+      "Comment#12,1970-01-01 00:00:00.006",
+      "Comment#13,1970-01-01 00:00:00.006",
+      "Comment#14,1970-01-01 00:00:00.006",
+      "Comment#15,1970-01-01 00:00:00.006").mkString("\n")
+
+    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+  }
+
+  @Test
+  def testAppendSinkOnAppendTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+        .assignAscendingTimestamps(_._1.toLong)
+        .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val sink = new TestingAppendTableSink(TimeZone.getDefault)
+    tEnv.registerTableSink(
+      "appendSink",
+      sink.configure(
+        Array[String]("t", "icnt", "nsum"),
+        Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG, Types.LONG)))
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w)
+      .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
+      .insertInto("appendSink")
+
+    env.execute()
+
+    val result = sink.getAppendResults.sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,4,8",
+      "1970-01-01 00:00:00.010,5,18",
+      "1970-01-01 00:00:00.015,5,24",
+      "1970-01-01 00:00:00.020,5,29",
+      "1970-01-01 00:00:00.025,2,12")
+      .sorted
+    assertEquals(expected, result)
+  }
+
+  @Test
+  def testAppendSinkOnAppendTableForInnerJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val ds1 = env.fromCollection(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = env.fromCollection(tupleData5).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val sink = new TestingAppendTableSink
+    tEnv.registerTableSink(
+      "appendSink",
+      sink.configure(
+        Array[String]("c", "g"),
+        Array[TypeInformation[_]](Types.STRING, Types.STRING)))
+
+    ds1.join(ds2).where('b === 'e)
+      .select('c, 'g)
+      .insertInto("appendSink")
+
+    env.execute()
+
+    val result = sink.getAppendResults.sorted
+    val expected = List("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt").sorted
+    assertEquals(expected, result)
+  }
+
+  @Test
+  def testRetractSinkOnUpdatingTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text)
+
+    val sink = new TestingRetractTableSink()
+    tEnv.registerTableSink(
+      "retractSink",
+      sink.configure(
+        Array[String]("len", "icnt", "nsum"),
+        Array[TypeInformation[_]](Types.INT, Types.LONG, Types.LONG)))
+
+    t.select('id, 'num, 'text.charLength() as 'len)
+      .groupBy('len)
+      .select('len, 'id.count as 'icnt, 'num.sum as 'nsum)
+      .insertInto("retractSink")
+
+    env.execute()
+
+    val retracted = sink.getRetractResults.sorted
+    val expected = List(
+      "2,1,1",
+      "5,1,2",
+      "11,1,2",
+      "25,1,3",
+      "10,7,39",
+      "14,1,3",
+      "9,9,41").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testRetractSinkOnAppendTable(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val sink = new TestingRetractTableSink(TimeZone.getDefault)
+    tEnv.registerTableSink(
+      "retractSink",
+      sink.configure(
+        Array[String]("t", "icnt", "nsum"),
+        Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG, Types.LONG)))
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w)
+      .select('w.end as 't, 'id.count as 'icnt, 'num.sum as 'nsum)
+      .insertInto("retractSink")
+
+    env.execute()
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      sink.getRawResults.exists(_.startsWith("(false,")))
+
+    val retracted = sink.getRetractResults.sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,4,8",
+      "1970-01-01 00:00:00.010,5,18",
+      "1970-01-01 00:00:00.015,5,24",
+      "1970-01-01 00:00:00.020,5,29",
+      "1970-01-01 00:00:00.025,2,12")
+      .sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testUpsertSinkOnUpdatingTableWithFullKey(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text)
+
+    val sink = new TestingUpsertTableSink(Array(0, 2), TimeZone.getDefault)
+    sink.expectedKeys = Some(Array("cnt", "cTrue"))
+    sink.expectedIsAppendOnly = Some(false)
+    tEnv.registerTableSink(
+      "upsertSink",
+      sink.configure(
+        Array[String]("cnt", "lencnt", "cTrue"),
+        Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.BOOLEAN)))
+
+    t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+      .groupBy('len, 'cTrue)
+      .select('len, 'id.count as 'cnt, 'cTrue)
+      .groupBy('cnt, 'cTrue)
+      .select('cnt, 'len.count as 'lencnt, 'cTrue)
+      .insertInto("upsertSink")
+
+    env.execute()
+
+    assertTrue(
+      "Results must include delete messages",
+      sink.getRawResults.exists(_.startsWith("(false,")))
+
+    val retracted = sink.getUpsertResults.sorted
+    val expected = List(
+      "1,5,true",
+      "7,1,true",
+      "9,1,true").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testUpsertSinkOnAppendingTableWithFullKey1(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val sink = new TestingUpsertTableSink(Array(0, 1, 2), TimeZone.getDefault)
+    sink.expectedKeys = Some(Array("wend", "num"))
+    sink.expectedIsAppendOnly = Some(true)
+    tEnv.registerTableSink(
+      "upsertSink",
+      sink.configure(
+        Array[String]("num", "wend", "icnt"),
+        Array[TypeInformation[_]](Types.LONG, Types.SQL_TIMESTAMP, Types.LONG)))
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'num)
+      .select('num, 'w.end as 'wend, 'id.count as 'icnt)
+      .insertInto("upsertSink")
+
+    env.execute()
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      sink.getRawResults.exists(_.startsWith("(false,")))
+
+    val retracted = sink.getUpsertResults.sorted
+    val expected = List(
+      "1,1970-01-01 00:00:00.005,1",
+      "2,1970-01-01 00:00:00.005,2",
+      "3,1970-01-01 00:00:00.005,1",
+      "3,1970-01-01 00:00:00.010,2",
+      "4,1970-01-01 00:00:00.010,3",
+      "4,1970-01-01 00:00:00.015,1",
+      "5,1970-01-01 00:00:00.015,4",
+      "5,1970-01-01 00:00:00.020,1",
+      "6,1970-01-01 00:00:00.020,4",
+      "6,1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testUpsertSinkOnAppendingTableWithFullKey2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val sink = new TestingUpsertTableSink(Array(0, 1, 2), TimeZone.getDefault)
+    sink.expectedKeys = Some(Array("wend", "num"))
+    sink.expectedIsAppendOnly = Some(true)
+    tEnv.registerTableSink(
+      "upsertSink",
+      sink.configure(
+        Array[String]("wstart", "wend", "num", "icnt"),
+        Array[TypeInformation[_]]
+          (Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG, Types.LONG)))
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'num)
+      .select('w.start as 'wstart, 'w.end as 'wend, 'num, 'id.count as 'icnt)
+      .insertInto("upsertSink")
+
+    env.execute()
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      sink.getRawResults.exists(_.startsWith("(false,")))
+
+    val retracted = sink.getUpsertResults.sorted
+    val expected = List(
+      "1970-01-01 00:00:00.000,1970-01-01 00:00:00.005,1,1",
+      "1970-01-01 00:00:00.000,1970-01-01 00:00:00.005,2,2",
+      "1970-01-01 00:00:00.000,1970-01-01 00:00:00.005,3,1",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.010,3,2",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.010,4,3",
+      "1970-01-01 00:00:00.010,1970-01-01 00:00:00.015,4,1",
+      "1970-01-01 00:00:00.010,1970-01-01 00:00:00.015,5,4",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.020,5,1",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.020,6,4",
+      "1970-01-01 00:00:00.020,1970-01-01 00:00:00.025,6,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testUpsertSinkOnAppendingTableWithoutFullKey1(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val sink = new TestingUpsertTableSink(Array(0), TimeZone.getDefault)
+    sink.expectedIsAppendOnly = Some(true)
+    tEnv.registerTableSink(
+      "upsertSink",
+      sink.configure(
+        Array[String]("wend", "cnt"),
+        Array[TypeInformation[_]](Types.SQL_TIMESTAMP, Types.LONG)))
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'num)
+      .select('w.end as 'wend, 'id.count as 'cnt)
+      .insertInto("upsertSink")
+
+    env.execute()
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      sink.getRawResults.exists(_.startsWith("(false,")))
+
+    val retracted = sink.getRawResults.sorted
+    val expected = List(
+      "(true,1970-01-01 00:00:00.005,1)",
+      "(true,1970-01-01 00:00:00.005,2)",
+      "(true,1970-01-01 00:00:00.005,1)",
+      "(true,1970-01-01 00:00:00.010,2)",
+      "(true,1970-01-01 00:00:00.010,3)",
+      "(true,1970-01-01 00:00:00.015,1)",
+      "(true,1970-01-01 00:00:00.015,4)",
+      "(true,1970-01-01 00:00:00.020,1)",
+      "(true,1970-01-01 00:00:00.020,4)",
+      "(true,1970-01-01 00:00:00.025,2)").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testUpsertSinkOnAppendingTableWithoutFullKey2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val sink = new TestingUpsertTableSink(Array(0), TimeZone.getDefault)
+    sink.expectedIsAppendOnly = Some(true)
+    tEnv.registerTableSink(
+      "upsertSink",
+      sink.configure(
+        Array[String]("num", "cnt"),
+        Array[TypeInformation[_]](Types.LONG, Types.LONG)))
+
+    t.window(Tumble over 5.millis on 'rowtime as 'w)
+      .groupBy('w, 'num)
+      .select('num, 'id.count as 'cnt)
+      .insertInto("upsertSink")
+
+    env.execute()
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      sink.getRawResults.exists(_.startsWith("(false,")))
+
+    val retracted = sink.getRawResults.sorted
+    val expected = List(
+      "(true,1,1)",
+      "(true,2,2)",
+      "(true,3,1)",
+      "(true,3,2)",
+      "(true,4,3)",
+      "(true,4,1)",
+      "(true,5,4)",
+      "(true,5,1)",
+      "(true,6,4)",
+      "(true,6,2)").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToAppendStreamMultiRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val r = t
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('num, 'w)
+      .select('num, 'w.rowtime, 'w.rowtime as 'rowtime2)
+
+    r.toAppendStream[Row]
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToRetractStreamMultiRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = StreamTableEnvironment.create(env)
+
+    val t = env.fromCollection(tupleData3)
+      .assignAscendingTimestamps(_._1.toLong)
+      .toTable(tEnv, 'id, 'num, 'text, 'rowtime.rowtime)
+
+    val r = t
+      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .groupBy('num, 'w)
+      .select('num, 'w.rowtime, 'w.rowtime as 'rowtime2)
+
+    r.toRetractStream[Row]
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
index b903eed..d44f316 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
@@ -253,17 +253,34 @@ final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone)
   var fNames: Array[String] = _
   var fTypes: Array[TypeInformation[_]] = _
   var sink = new TestingUpsertSink(keys, tz)
+  var expectedKeys: Option[Array[String]] = None
+  var expectedIsAppendOnly: Option[Boolean] = None
 
   def this(keys: Array[Int]) {
     this(keys, TimeZone.getTimeZone("UTC"))
   }
 
   override def setKeyFields(keys: Array[String]): Unit = {
-    // ignore
+    if (expectedKeys.isDefined && keys == null) {
+      throw new AssertionError("Provided key fields should not be null.")
+    } else if (expectedKeys.isEmpty) {
+      return
+    }
+    val expectedStr = expectedKeys.get.sorted.mkString(",")
+    val keysStr = keys.sorted.mkString(",")
+    if (!expectedStr.equals(keysStr)) {
+      throw new AssertionError(
+        s"Provided key fields($keysStr) do not match expected keys($expectedStr)")
+    }
   }
 
   override def setIsAppendOnly(isAppendOnly: JBoolean): Unit = {
-    // ignore
+    if (expectedIsAppendOnly.isEmpty) {
+      return
+    }
+    if (expectedIsAppendOnly.get != isAppendOnly) {
+      throw new AssertionError("Provided isAppendOnly does not match expected isAppendOnly")
+    }
   }
 
   override def getRecordType: TypeInformation[BaseRow] =