You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/09/18 22:38:19 UTC

[flink] branch master updated (0794fa8 -> cc37d7a)

This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 0794fa8  [hotfix] Move common dependencies into 'flink-filesystems'
     new f28b829  [FLINK-10259] [table] Fix identification of key attributes for GroupWindows.
     new cc37d7a  [FLINK-10079] [table] Look up sink tables in external catalogs.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/table/api/BatchTableEnvironment.scala    |   4 +-
 .../flink/table/api/StreamTableEnvironment.scala   |   4 +-
 .../apache/flink/table/api/TableEnvironment.scala  |  60 ++-
 .../table/catalog/ExternalCatalogSchema.scala      |   2 +-
 .../table/plan/util/UpdatingPlanChecker.scala      |   3 +-
 .../table/api/ExternalCatalogInsertTest.scala      | 107 ++++++
 .../table/catalog/ExternalCatalogSchemaTest.scala  |   2 +-
 .../runtime/stream/sql/InsertIntoITCase.scala      | 406 +++++++++++++++++++++
 .../flink/table/runtime/stream/sql/SqlITCase.scala |  29 --
 .../flink/table/runtime/utils/CommonTestData.scala |  23 ++
 10 files changed, 587 insertions(+), 53 deletions(-)
 create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
 create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala


[flink] 02/02: [FLINK-10079] [table] Look up sink tables in external catalogs.

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc37d7a7555aa7f18da2d0eb62c569cb080332ed
Author: jerryjzhang <zh...@163.com>
AuthorDate: Fri Sep 14 16:26:36 2018 +0800

    [FLINK-10079] [table] Look up sink tables in external catalogs.
    
    This closes #6508.
---
 .../flink/table/api/BatchTableEnvironment.scala    |   4 +-
 .../flink/table/api/StreamTableEnvironment.scala   |   4 +-
 .../apache/flink/table/api/TableEnvironment.scala  |  60 ++++++++----
 .../table/catalog/ExternalCatalogSchema.scala      |   2 +-
 .../table/api/ExternalCatalogInsertTest.scala      | 107 +++++++++++++++++++++
 .../table/catalog/ExternalCatalogSchemaTest.scala  |   2 +-
 .../flink/table/runtime/utils/CommonTestData.scala |  23 +++++
 7 files changed, 179 insertions(+), 23 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 9265f0f..04a7916 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -107,7 +107,7 @@ abstract class BatchTableEnvironment(
       // check for proper batch table source
       case batchTableSource: BatchTableSource[_] =>
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
@@ -249,7 +249,7 @@ abstract class BatchTableEnvironment(
       case _: BatchTableSink[_] =>
 
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 4c73032..d31ce6c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -126,7 +126,7 @@ abstract class StreamTableEnvironment(
         }
 
         // register
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // check if a table (source or sink) is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match {
@@ -273,7 +273,7 @@ abstract class StreamTableEnvironment(
       case _: StreamTableSink[_] =>
 
         // check if a table (source or sink) is registered
-        Option(getTable(name)) match {
+        getTable(name) match {
 
           // table source and/or sink is registered
           case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match {
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 195812d..5691ab7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -28,7 +28,6 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
 import org.apache.calcite.plan.{RelOptPlanner, RelOptUtil, RelTraitSet}
 import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.sql._
@@ -749,42 +748,42 @@ abstract class TableEnvironment(val config: TableConfig) {
     // check that sink table exists
     if (null == sinkTableName) throw TableException("Name of TableSink must not be null.")
     if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.")
-    if (!isRegistered(sinkTableName)) {
-      throw TableException(s"No table was registered under the name $sinkTableName.")
-    }
 
     getTable(sinkTableName) match {
 
-      // check for registered table that wraps a sink
-      case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined =>
+      case None =>
+        throw TableException(s"No table was registered under the name $sinkTableName.")
+
+      case Some(s: TableSourceSinkTable[_, _]) if s.tableSinkTable.isDefined =>
         val tableSink = s.tableSinkTable.get.tableSink
         // validate schema of source table and table sink
         val srcFieldTypes = table.getSchema.getTypes
         val sinkFieldTypes = tableSink.getFieldTypes
 
         if (srcFieldTypes.length != sinkFieldTypes.length ||
-          srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) {
+          srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => srcF != snkF }) {
 
           val srcFieldNames = table.getSchema.getColumnNames
           val sinkFieldNames = tableSink.getFieldNames
 
           // format table and table sink schema strings
           val srcSchema = srcFieldNames.zip(srcFieldTypes)
-            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
             .mkString("[", ", ", "]")
           val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
-            .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
+            .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" }
             .mkString("[", ", ", "]")
 
           throw ValidationException(
-            s"Field types of query result and registered TableSink $sinkTableName do not match.\n" +
+            s"Field types of query result and registered TableSink " +
+              s"$sinkTableName do not match.\n" +
               s"Query result schema: $srcSchema\n" +
               s"TableSink schema:    $sinkSchema")
         }
-
         // emit the table to the configured table sink
         writeToSink(table, tableSink, conf)
-      case _ =>
+
+      case Some(_) =>
         throw TableException(s"The table registered as $sinkTableName is not a TableSink. " +
           s"You can only emit query results to a registered TableSink.")
     }
@@ -828,12 +827,39 @@ abstract class TableEnvironment(val config: TableConfig) {
     rootSchema.getTableNames.contains(name)
   }
 
-  protected def getTable(name: String): org.apache.calcite.schema.Table = {
-    rootSchema.getTable(name)
-  }
+  /**
+    * Get a table from either internal or external catalogs.
+    *
+    * @param name The name of the table.
+    * @return The table registered either internally or externally, None otherwise.
+    */
+  protected def getTable(name: String): Option[org.apache.calcite.schema.Table] = {
+
+    // recursively fetches a table from a schema.
+    def getTableFromSchema(
+        schema: SchemaPlus,
+        path: List[String]): Option[org.apache.calcite.schema.Table] = {
+
+      path match {
+        case tableName :: Nil =>
+          // look up table
+          Option(schema.getTable(tableName))
+        case subschemaName :: remain =>
+          // look up subschema
+          val subschema = Option(schema.getSubSchema(subschemaName))
+          subschema match {
+            case Some(s) =>
+              // search for table in subschema
+              getTableFromSchema(s, remain)
+            case None =>
+              // subschema does not exist
+              None
+          }
+      }
+    }
 
-  protected def getRowType(name: String): RelDataType = {
-    rootSchema.getTable(name).getRowType(typeFactory)
+    val pathNames = name.split('.').toList
+    getTableFromSchema(rootSchema, pathNames)
   }
 
   /** Returns a unique temporary attribute name. */
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index 022b9a2..3129ad3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -96,7 +96,7 @@ class ExternalCatalogSchema(
 
   override def getFunctionNames: JSet[String] = JCollections.emptySet[String]
 
-  override def getTableNames: JSet[String] = JCollections.emptySet[String]
+  override def getTableNames: JSet[String] = new JLinkedHashSet(catalog.listTables())
 
   override def snapshot(v: SchemaVersion): Schema = this
 
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
new file mode 100644
index 0000000..4b1fb18
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+  * Test for inserting into tables from external catalog.
+  */
+class ExternalCatalogInsertTest extends TableTestBase {
+  private val tableBatchEnv = TableEnvironment.getTableEnvironment(
+    ExecutionEnvironment.getExecutionEnvironment)
+  private val tableStreamEnv = TableEnvironment.getTableEnvironment(
+    StreamExecutionEnvironment.getExecutionEnvironment)
+
+  @Test
+  def testBatchTableApi(): Unit = {
+    tableBatchEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val table1 = tableBatchEnv.scan("test", "db1", "tb1")
+    val table2 = tableBatchEnv.scan("test", "db2", "tb2")
+    table2.select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.db3.tb3")
+  }
+
+  @Test
+  def testBatchSQL(): Unit = {
+    tableBatchEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
+      "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    tableBatchEnv.sqlUpdate(sqlInsert)
+  }
+
+  @Test
+  def testStreamTableApi(): Unit = {
+    var tableEnv = tableStreamEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = true))
+
+    val table1 = tableEnv.scan("test", "db1", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+
+    table2.where("d < 3")
+      .select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.db3.tb3")
+  }
+
+  @Test
+  def testStreamSQL(): Unit = {
+    var tableEnv = tableStreamEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = true))
+
+    val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " +
+      "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    tableEnv.sqlUpdate(sqlInsert)
+  }
+
+  @Test
+  def testTopLevelTable(): Unit = {
+    var tableEnv = tableBatchEnv
+
+    tableEnv.registerExternalCatalog(
+      "test",
+      CommonTestData.getInMemoryTestCatalog(isStreaming = false))
+
+    val table1 = tableEnv.scan("test", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+    table2.select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+      .insertInto("test.tb3")
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
index c98a7c1..2ca7fba 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala
@@ -71,7 +71,7 @@ class ExternalCatalogSchemaTest extends TableTestBase {
         .filter(_.getType.equals(SqlMonikerType.SCHEMA))
         .map(_.getFullyQualifiedNames.asScala.toList).toSet
     assertTrue(Set(List(schemaName), List(schemaName, "db1"),
-      List(schemaName, "db2")) == subSchemas)
+      List(schemaName, "db2"), List(schemaName, "db3")) == subSchemas)
   }
 
   @Test
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
index e62396f..64fcc8a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala
@@ -129,16 +129,39 @@ object CommonTestData {
       externalTableBuilder2.inAppendMode()
     }
 
+    val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
+    val connDesc3 = FileSystem().path(tempFilePath3)
+    val formatDesc3 = Csv()
+      .field("x", Types.INT)
+      .field("y", Types.LONG)
+      .field("z", Types.STRING)
+      .fieldDelimiter("#")
+    val schemaDesc3 = Schema()
+      .field("x", Types.INT)
+      .field("y", Types.LONG)
+      .field("z", Types.STRING)
+    val externalTableBuilder3 = ExternalCatalogTable.builder(connDesc3)
+      .withFormat(formatDesc3)
+      .withSchema(schemaDesc3)
+
+    if (isStreaming) {
+      externalTableBuilder3.inAppendMode()
+    }
+
     val catalog = new InMemoryExternalCatalog("test")
     val db1 = new InMemoryExternalCatalog("db1")
     val db2 = new InMemoryExternalCatalog("db2")
+    val db3 = new InMemoryExternalCatalog("db3")
     catalog.createSubCatalog("db1", db1, ignoreIfExists = false)
     catalog.createSubCatalog("db2", db2, ignoreIfExists = false)
+    catalog.createSubCatalog("db3", db3, ignoreIfExists = false)
 
     // Register the table with both catalogs
     catalog.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false)
+    catalog.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false)
     db1.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false)
     db2.createTable("tb2", externalTableBuilder2.asTableSource(), ignoreIfExists = false)
+    db3.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false)
     catalog
   }
 


[flink] 01/02: [FLINK-10259] [table] Fix identification of key attributes for GroupWindows.

Posted by fh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f28b82909c3c6bcbe0436cae41af9a3c001f1c36
Author: Fabian Hueske <fh...@apache.org>
AuthorDate: Thu Aug 30 15:39:09 2018 +0200

    [FLINK-10259] [table] Fix identification of key attributes for GroupWindows.
    
    This closes #6641.
---
 .../table/plan/util/UpdatingPlanChecker.scala      |   3 +-
 .../runtime/stream/sql/InsertIntoITCase.scala      | 406 +++++++++++++++++++++
 .../flink/table/runtime/stream/sql/SqlITCase.scala |  29 --
 3 files changed, 408 insertions(+), 30 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index 4b7d0ed..c478987 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -142,7 +142,8 @@ object UpdatingPlanChecker {
             .map(_.name)
           // we have only a unique key if at least one window property is selected
           if (windowProperties.nonEmpty) {
-            Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, e)))
+            val windowId = windowProperties.min
+            Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, windowId)))
           } else {
             None
           }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
new file mode 100644
index 0000000..efba026
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/InsertIntoITCase.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.runtime.stream.table.{RowCollector, TestRetractSink, TestUpsertSink}
+import org.apache.flink.table.runtime.utils.{StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class InsertIntoITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testInsertIntoAppendStreamToTableSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val input = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(r => r._2)
+
+    tEnv.registerDataStream("sourceTable", input, 'a, 'b, 'c, 't.rowtime)
+
+    val fieldNames = Array("d", "e", "t")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.SQL_TIMESTAMP, Types.LONG)
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT c, t, b
+         |FROM sourceTable
+         |WHERE a < 3 OR a > 19
+       """.stripMargin)
+
+    env.execute()
+
+    val expected = Seq(
+      "Hi,1970-01-01 00:00:00.001,1",
+      "Hello,1970-01-01 00:00:00.002,2",
+      "Comment#14,1970-01-01 00:00:00.006,6",
+      "Comment#15,1970-01-01 00:00:00.006,6").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(MemoryTableSourceSinkUtil.tableData.asJava, expected)
+  }
+
+  @Test
+  def testInsertIntoUpdatingTableToRetractSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("len", "cntid", "sumnum"),
+      Array(Types.INT, Types.LONG, Types.LONG),
+      new TestRetractSink)
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT len, COUNT(id) AS cntid, SUM(num) AS sumnum
+         |FROM (SELECT id, num, CHAR_LENGTH(text) AS len FROM sourceTable)
+         |GROUP BY len
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    val retracted = RowCollector.retractResults(results).sorted
+    val expected = List(
+      "2,1,1",
+      "5,1,2",
+      "11,1,2",
+      "25,1,3",
+      "10,7,39",
+      "14,1,3",
+      "9,9,41").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoAppendTableToRetractSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wend", "cntid", "sumnum"),
+      Array(Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+      new TestRetractSink
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid,
+         |  SUM(num) AS sumnum
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND)
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.retractResults(results).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,4,8",
+      "1970-01-01 00:00:00.01,5,18",
+      "1970-01-01 00:00:00.015,5,24",
+      "1970-01-01 00:00:00.02,5,29",
+      "1970-01-01 00:00:00.025,2,12")
+      .sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoUpdatingTableWithFullKeyToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("cnt", "cntid", "cTrue"),
+      Array(Types.LONG, Types.LONG, Types.BOOLEAN),
+      new TestUpsertSink(Array("cnt", "cTrue"), false)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT cnt, COUNT(len) AS cntid, cTrue
+         |FROM
+         |  (SELECT CHAR_LENGTH(text) AS len, (id > 0) AS cTrue, COUNT(id) AS cnt
+         |   FROM sourceTable
+         |   GROUP BY CHAR_LENGTH(text), (id > 0)
+         |   )
+         |GROUP BY cnt, cTrue
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertTrue(
+      "Results must include delete messages",
+      results.exists(_.f0 == false)
+    )
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 2)).sorted
+    val expected = List(
+      "1,5,true",
+      "7,1,true",
+      "9,1,true").sorted
+    assertEquals(expected, retracted)
+
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithFullKey1ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("num", "wend", "cntid"),
+      Array(Types.LONG, Types.SQL_TIMESTAMP, Types.LONG),
+      new TestUpsertSink(Array("wend", "num"), true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  num,
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 1)).sorted
+    val expected = List(
+      "1,1970-01-01 00:00:00.005,1",
+      "2,1970-01-01 00:00:00.005,2",
+      "3,1970-01-01 00:00:00.005,1",
+      "3,1970-01-01 00:00:00.01,2",
+      "4,1970-01-01 00:00:00.01,3",
+      "4,1970-01-01 00:00:00.015,1",
+      "5,1970-01-01 00:00:00.015,4",
+      "5,1970-01-01 00:00:00.02,1",
+      "6,1970-01-01 00:00:00.02,4",
+      "6,1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithFullKey2ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wstart", "wend", "num", "cntid"),
+      Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG, Types.LONG),
+      new TestUpsertSink(Array("wstart", "wend", "num"), true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) AS wstart,
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  num,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = RowCollector.upsertResults(results, Array(0, 1, 2)).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,1,1",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,2,2",
+      "1970-01-01 00:00:00.0,1970-01-01 00:00:00.005,3,1",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,3,2",
+      "1970-01-01 00:00:00.005,1970-01-01 00:00:00.01,4,3",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,4,1",
+      "1970-01-01 00:00:00.01,1970-01-01 00:00:00.015,5,4",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,5,1",
+      "1970-01-01 00:00:00.015,1970-01-01 00:00:00.02,6,4",
+      "1970-01-01 00:00:00.02,1970-01-01 00:00:00.025,6,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithoutFullKey1ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("wend", "cntid"),
+      Array(Types.SQL_TIMESTAMP, Types.LONG),
+      new TestUpsertSink(null, true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND) AS wend,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = results.map(_.f1.toString).sorted
+    val expected = List(
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.005,2",
+      "1970-01-01 00:00:00.005,1",
+      "1970-01-01 00:00:00.01,2",
+      "1970-01-01 00:00:00.01,3",
+      "1970-01-01 00:00:00.015,1",
+      "1970-01-01 00:00:00.015,4",
+      "1970-01-01 00:00:00.02,1",
+      "1970-01-01 00:00:00.02,4",
+      "1970-01-01 00:00:00.025,2").sorted
+    assertEquals(expected, retracted)
+  }
+
+  @Test
+  def testInsertIntoAppendingTableWithoutFullKey2ToUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = StreamTestData.get3TupleDataStream(env)
+      .assignAscendingTimestamps(_._1.toLong)
+
+    tEnv.registerDataStream("sourceTable", t, 'id, 'num, 'text, 'rowtime.rowtime)
+    tEnv.registerTableSink(
+      "targetTable",
+      Array("num", "cntid"),
+      Array(Types.LONG, Types.LONG),
+      new TestUpsertSink(null, true)
+    )
+
+    tEnv.sqlUpdate(
+      s"""INSERT INTO targetTable
+         |SELECT
+         |  num,
+         |  COUNT(id) AS cntid
+         |FROM sourceTable
+         |GROUP BY TUMBLE(rowtime, INTERVAL '0.005' SECOND), num
+       """.stripMargin)
+
+    env.execute()
+    val results = RowCollector.getAndClearValues
+
+    assertFalse(
+      "Received retraction messages for append only table",
+      results.exists(!_.f0))
+
+    val retracted = results.map(_.f1.toString).sorted
+    val expected = List(
+      "1,1",
+      "2,2",
+      "3,1",
+      "3,2",
+      "4,3",
+      "4,1",
+      "5,4",
+      "5,1",
+      "6,4",
+      "6,2").sorted
+    assertEquals(expected, retracted)
+  }
+}
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index f187055..de0b392 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -714,35 +714,6 @@ class SqlITCase extends StreamingWithStateTestBase {
   }
 
   @Test
-  def testInsertIntoMemoryTable(): Unit = {
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    MemoryTableSourceSinkUtil.clear()
-
-    val t = StreamTestData.getSmall3TupleDataStream(env)
-        .assignAscendingTimestamps(x => x._2)
-      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-    tEnv.registerTable("sourceTable", t)
-
-    val fieldNames = Array("d", "e", "f", "t")
-    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
-      .asInstanceOf[Array[TypeInformation[_]]]
-    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
-    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
-
-    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable"
-    tEnv.sqlUpdate(sql)
-    env.execute()
-
-    val expected = List(
-      "1,1,Hi,1970-01-01 00:00:00.001",
-      "2,2,Hello,1970-01-01 00:00:00.002",
-      "3,2,Hello world,1970-01-01 00:00:00.002")
-    assertEquals(expected.sorted, MemoryTableSourceSinkUtil.tableDataStrings.sorted)
-  }
-
-  @Test
   def testWriteReadTableSourceSink(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)