You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/09/19 19:58:28 UTC

[flink] branch release-1.6 updated (5b2b0f7 -> 65cd7b5)

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a change to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 5b2b0f7  [FLINK-10332][network] move data notification out of the synchronized block
     new 290d96e  [FLINK-10259] [table] Fix identification of key attributes for GroupWindows.
     new 65cd7b5  [FLINK-10079] [table] Look up sink tables in external catalogs.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/api/BatchTableEnvironment.scala    |   4 +-
 .../flink/table/api/StreamTableEnvironment.scala   |   4 +-
 .../apache/flink/table/api/TableEnvironment.scala  |  60 ++-
 .../table/catalog/ExternalCatalogSchema.scala      |   2 +-
 .../table/plan/util/UpdatingPlanChecker.scala      |   3 +-
 .../table/api/ExternalCatalogInsertTest.scala      | 107 ++++++
 .../table/catalog/ExternalCatalogSchemaTest.scala  |   2 +-
 .../runtime/stream/sql/InsertIntoITCase.scala      | 406 +++++++++++++++++++++
 .../flink/table/runtime/stream/sql/SqlITCase.scala |  29 --
 .../flink/table/runtime/utils/CommonTestData.scala |  23 ++
 10 files changed, 587 insertions(+), 53 deletions(-)
 create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
 create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala


[flink] 02/02: [FLINK-10079] [table] Look up sink tables in external catalogs.

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 65cd7b55eb7c0c7af840eebdddf83ae46d28e204
Author: jerryjzhang <zh...@163.com>
AuthorDate: Fri Sep 14 16:26:36 2018 +0800

    [FLINK-10079] [table] Look up sink tables in external catalogs.
    
    This closes #6508.
---
 .../flink/table/api/BatchTableEnvironment.scala    |   4 +-
 .../flink/table/api/StreamTableEnvironment.scala   |   4 +-
 .../apache/flink/table/api/TableEnvironment.scala  |  60 ++++++++----
 .../table/catalog/ExternalCatalogSchema.scala      |   2 +-
 .../table/api/ExternalCatalogInsertTest.scala      | 107 +++++++++++++++++++++
 .../table/catalog/ExternalCatalogSchemaTest.scala  |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  23 +++++
 7 files changed, 179 insertions(+), 23 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 9265f0f..04a7916 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -107,7 +107,7 @@ abstract class BatchTableEnvironment(
       // check for proper batch table source
       case batchTableSource: BatchTableSource[_] =>
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
@@ -249,7 +249,7 @@ abstract class BatchTableEnvironment(
       case _: BatchTableSink[_] =>
 
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 4c73032..d31ce6c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -126,7 +126,7 @@ abstract class StreamTableEnvironment(
         }
 
         // register
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // check if a table (source or sink) is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
@@ -273,7 +273,7 @@ abstract class StreamTableEnvironment(
       case _: StreamTableSink[_] =>
 
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 195812d..5691ab7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -28,7 +28,6 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
 import org.apache.calcite.plan.{RelOptPlanner, RelOptUtil, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
@@ -749,42 +748,42 @@ abstract class TableEnvironment(val config: TableConfig) {
     // check that sink table exists
     if (null == sinkTableName) throw TableException("Name of TableSink must not be null.")
     if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.")
-    if (!isRegistered(sinkTableName)) {
-      throw TableException(s"No table was registered under the name $sinkTableName.")
-    }
 
     getTable(sinkTableName) match {
 
-      // check for registered table that wraps a sink
-      case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined =>
+      case None =>
+        throw TableException(s"No table was registered under the name $sinkTableName.")
+
+      case Some(s: TableSourceSinkTable[_, _]) if s.tableSinkTable.isDefined =>
         val tableSink = s.tableSinkTable.get.tableSink
         // validate schema of source table and table sink
         val srcFieldTypes = table.getSchema.getTypes
         val sinkFieldTypes = tableSink.getFieldTypes
 
         if (srcFieldTypes.length != sinkFieldTypes.length ||
-          srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) {
+          srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => srcF != snkF }) {
 
           val srcFieldNames = table.getSchema.getColumnNames
           val sinkFieldNames = tableSink.getFieldNames
 
           // format table and table sink schema strings
           val srcSchema = srcFieldNames.zip(srcFieldTypes)
-            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
             .mkString("[", ", ", "]")
           val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
-            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
             .mkString("[", ", ", "]")
 
           throw ValidationException(
-            s"Field types of query result and registered TableSink $sinkTableName do not match.\n" +
+            s"Field types of query result and registered TableSink " +
+              s"$sinkTableName do not match.\n" +
               s"Query result schema: $srcSchema\n" +
               s"TableSink schema:    $sinkSchema")
         }
-
         // emit the table to the configured table sink
         writeToSink(table, tableSink, conf)
-      case _ =>
+
+      case Some(_) =>
         throw TableException(s"The table registered as $sinkTableName is not a TableSink. " +
           s"You can only emit query results to a registered TableSink.")
     }
@@ -828,12 +827,39 @@ abstract class TableEnvironment(val config: TableConfig) {
     rootSchema.getTableNames.contains(name)
   }
 
-  protected def getTable(name: String): org.apache.calcite.schema.Table = {
-    rootSchema.getTable(name)
-  }
+  /**
+    * Get a table from either internal or external catalogs.
+    *
+    * @param name The name of the table.
+    * @return The table registered either internally or externally, None otherwise.
+    */
+  protected def getTable(name: String): Option[org.apache.calcite.schema.Table] = {
+
+    // recursively fetches a table from a schema.
+    def getTableFromSchema(
+        schema: SchemaPlus,
+        path: List[String]): Option[org.apache.calcite.schema.Table] = {
+
+      path match {
+        case tableName :: Nil =>
+          // look up table
+          Option(schema.getTable(tableName))
+        case subschemaName :: remain =>
+          // look up subschema
+          val subschema = Option(schema.getSubSchema(subschemaName))
+          subschema match {
+            case Some(s) =>
+              // search for table in subschema
+              getTableFromSchema(s, remain)
+            case None =>
+              // subschema does not exist
+              None
+          }
+      }
+    }
 
-  protected def getRowType(name: String): RelDataType = {
-    rootSchema.getTable(name).getRowType(typeFactory)
+    val pathNames = name.split('.').toList
+    getTableFromSchema(rootSchema, pathNames)
   }
 
   /** Returns a unique temporary attribute name. */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index adac938..c3eac8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -94,7 +94,7 @@ class ExternalCatalogSchema(
 
   override def getFunctionNames: JSet[String] = JCollections.emptySet[String]
 
-  override def getTableNames: JSet[String] = JCollections.emptySet[String]
+  override def getTableNames: JSet[String] = new JLinkedHashSet(catalog.listTables())
 
   override def snapshot(v: SchemaVersion): Schema = this
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
new file mode 100644
index 0000000..4b1fb18
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+  * Test for inserting into tables from external catalog.
+  */
+class ExternalCatalogInsertTest extends TableTestBase {
+  private val tableBatchEnv = TableEnvironment.getTableEnvironment(
+    ExecutionEnvironment.getExecutionEnvironment)
+  private val tableStreamEnv = TableEnvironment.getTableEnvironment(
+    StreamExecutionEnvironment.getExecutionEnvironment)
+
+  @Test
+  def testBatchTableApi(): Unit = {
+    tableBatchEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val table1 = tableBatchEnv.scan("test", "db1", "tb1")
+    val table2 = tableBatchEnv.scan("test", "db2", "tb2")
+    table2.select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.db3.tb3")
+  }
+
+  @Test
+  def testBatchSQL(): Unit = {
+    tableBatchEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
+      "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    tableBatchEnv.sqlUpdate(sqlInsert)
+  }
+
+  @Test
+  def testStreamTableApi(): Unit = {
+    var tableEnv = tableStreamEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = true))
+
+    val table1 = tableEnv.scan("test", "db1", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+
+    table2.where("d < 3")
+      .select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.db3.tb3")
+  }
+
+  @Test
+  def testStreamSQL(): Unit = {
+    var tableEnv = tableStreamEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = true))
+
+    val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
+      "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    tableEnv.sqlUpdate(sqlInsert)
+  }
+
+  @Test
+  def testTopLevelTable(): Unit = {
+    var tableEnv = tableBatchEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val table1 = tableEnv.scan("test", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+    table2.select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.tb3")
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index c98a7c1..2ca7fba 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -71,7 +71,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
         .filter(_.getType.equals(SqlMonikerType.SCHEMA))
         .map(_.getFullyQualifiedNames.asScala.toList).toSet
     assertTrue(Set(List(schemaName), List(schemaName, "db1"),
-      List(schemaName, "db2")) == subSchemas)
+      List(schemaName, "db2"), List(schemaName, "db3")) == subSchemas)
   }
 
   @Test
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index e62396f..64fcc8a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -129,16 +129,39 @@ object CommonTestData {
       externalTableBuilder2.inAppendMode()
     }
 
+    val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
+    val connDesc3 = FileSystem().path(tempFilePath3)
+    val formatDesc3 = Csv()
+      .field("x", Types.INT)
+      .field("y", Types.LONG)
+      .field("z", Types.STRING)
+      .fieldDelimiter("#")
+    val schemaDesc3 = Schema()
+      .field("x", Types.INT)
+      .field("y", Types.LONG)
+      .field("z", Types.STRING)
+    val externalTableBuilder3 = ExternalCatalogTable.builder(connDesc3)
+      .withFormat(formatDesc3)
+      .withSchema(schemaDesc3)
+
+    if (isStreaming) {
+      externalTableBuilder3.inAppendMode()
+    }
+
     val catalog = new InMemoryExternalCatalog("test")
     val db1 = new InMemoryExternalCatalog("db1")
     val db2 = new InMemoryExternalCatalog("db2")
+    val db3 = new InMemoryExternalCatalog("db3")
     catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
     catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
+    catalog.createSubCatalog("db3", db3, ignoreIfExists = false)
 
     // Register the table with both catalogs
     catalog.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false)
+    catalog.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false)
     db1.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false)
     db2.createTable("tb2", externalTableBuilder2.asTableSource(), ignoreIfExists = false)
+    db3.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false)
     catalog
   }
 


[flink] 01/02: [FLINK-10259] [table] Fix identification of key attributes for GroupWindows.

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 290d96e50f864468b6b2d2a3b741e0fbdaee5df0
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Thu Aug 30 15:39:09 2018 +0200

    [FLINK-10259] [table] Fix identification of key attributes for GroupWindows.
    
    This closes #6641.
---
 .../table/plan/util/UpdatingPlanChecker.scala      |   3 +-
 .../runtime/stream/sql/InsertIntoITCase.scala      | 406 +++++++++++++++++++++
 .../flink/table/runtime/stream/sql/SqlITCase.scala |  29 --
 3 files changed, 408 insertions(+), 30 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index 4b7d0ed..c478987 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -142,7 +142,8 @@ object UpdatingPlanChecker {
             .map(_.name)
           // we have only a unique key if at least one window property is selected
           if (windowProperties.nonEmpty) {
-            Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, e)))
+            val windowId = windowProperties.min
+            Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, windowId)))
           } else {
             None
           }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
new file mode 100644
index 0000000..efba026
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.runtime.stream.table.{RowCollector, TestRetractSink, TestUpsertSink}
+import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class InsertIntoITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testInsertIntoAppendStreamToTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val input = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(r => r._2)
+
+    tEnv.registerDataStream("sourceTable", input, 'a, 'b, 'c, 't.rowtime)
+
+    val fieldNames = Array("d", "e", "t")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT c, t, b
+         |FROM sourceTable
+         |WHERE a < 3 OR a > 19
+       """.stripMargin)
+
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1970-01-01 00:00:00.001,1",
+      "Hello,1970-01-01 00:00:00.002,2",
+      "Comment#14,1970-01-01 00:00:00.006,6",
+      "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
+  }
+
+  @Test
+  def testInsertIntoUpdatingTableToRetractSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("len", "cntid", "sumnum"),
+      Array(Types.INT, Types.LONG, Types.LONG),
+      new TestRetractSink)
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT len, COUNT(id) AS cntid, SUM(num) AS sumnum
+         |FROM (SELECT id, num, CHAR_LENGTH(text) AS len FROM sourceTable)
+         |GROUP BY len
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    val retracted = RowCollector.retractResults(results).sorted
+    val expected = List(
+      "2,1,1",
+      "5,1,2",
+      "11,1,2",
+      "25,1,3",
+      "10,7,39",
+      "14,1,3",
+      "9,9,41").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoAppendTableToRetractSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wend", "cntid", "sumnum"),
+      Array(Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+      new TestRetractSink
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid,
+         |  SUM(num) AS sumnum
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND)
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.retractResults(results).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,4,8",
+      "1970-01-01 00:00:00.01,5,18",
+      "1970-01-01 00:00:00.015,5,24",
+      "1970-01-01 00:00:00.02,5,29",
+      "1970-01-01 00:00:00.025,2,12")
+      .sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoUpdatingTableWithFullKeyToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("cnt", "cntid", "cTrue"),
+      Array(Types.LONG, Types.LONG, Types.BOOLEAN),
+      new TestUpsertSink(Array("cnt", "cTrue"), false)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT cnt, COUNT(len) AS cntid, cTrue
+         |FROM
+         |  (SELECT CHAR_LENGTH(text) AS len, (id > 0) AS cTrue, COUNT(id) AS cnt
+         |   FROM sourceTable
+         |   GROUP BY CHAR_LENGTH(text), (id > 0)
+         |   )
+         |GROUP BY cnt, cTrue
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertTrue(
+      "Results must include delete messages",
+      results.exists(_.f0 == false)
+    )
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 2)).sorted
+    val expected = List(
+      "1,5,true",
+      "7,1,true",
+      "9,1,true").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithFullKey1ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("num", "wend", "cntid"),
+      Array(Types.LONG, Types.SQL_TIMESTAMP, Types.LONG),
+      new TestUpsertSink(Array("wend", "num"), true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  num,
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 1)).sorted
+    val expected = List(
+      "1,1970-01-01 00:00:00.005,1",
+      "2,1970-01-01 00:00:00.005,2",
+      "3,1970-01-01 00:00:00.005,1",
+      "3,1970-01-01 00:00:00.01,2",
+      "4,1970-01-01 00:00:00.01,3",
+      "4,1970-01-01 00:00:00.015,1",
+      "5,1970-01-01 00:00:00.015,4",
+      "5,1970-01-01 00:00:00.02,1",
+      "6,1970-01-01 00:00:00.02,4",
+      "6,1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithFullKey2ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wstart", "wend", "num", "cntid"),
+      Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+      new TestUpsertSink(Array("wstart", "wend", "num"), true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) AS wstart,
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  num,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 1, 2)).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
+      "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wend", "cntid"),
+      Array(Types.SQL_TIMESTAMP, Types.LONG),
+      new TestUpsertSink(null, true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = results.map(_.f1.toString).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.005,2",
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.01,2",
+      "1970-01-01 00:00:00.01,3",
+      "1970-01-01 00:00:00.015,1",
+      "1970-01-01 00:00:00.015,4",
+      "1970-01-01 00:00:00.02,1",
+      "1970-01-01 00:00:00.02,4",
+      "1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("num", "cntid"),
+      Array(Types.LONG, Types.LONG),
+      new TestUpsertSink(null, true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  num,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = results.map(_.f1.toString).sorted
+    val expected = List(
+      "1,1",
+      "2,2",
+      "3,1",
+      "3,2",
+      "4,3",
+      "4,1",
+      "5,4",
+      "5,1",
+      "6,4",
+      "6,2").sorted
+    assertEquals(expected, retracted)
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index a2d9bb2..51bea2c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -714,35 +714,6 @@ class SqlITCase extends StreamingWithStateTestBase {
   }
 
   @Test
-  def testInsertIntoMemoryTable(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSourceSinkUtil.clear()
-
-    val t = StreamTestData.getSmall3TupleDataStream(env)
-        .assignAscendingTimestamps(x => x._2)
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-    tEnv.registerTable("sourceTable", t)
-
-    val fieldNames = Array("d", "e", "f", "t")
-    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
-      .asInstanceOf[Array[TypeInformation[_]]]
-    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
-
-    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
-    tEnv.sqlUpdate(sql)
-    env.execute()
-
-    val expected = List(
-      "1,1,Hi,1970-01-01 00:00:00.001",
-      "2,2,Hello,1970-01-01 00:00:00.002",
-      "3,2,Hello world,1970-01-01 00:00:00.002")
-    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
-  }
-
-  @Test
   def testWriteReadTableSourceSink(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)