You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/12/08 09:04:09 UTC

[flink] branch release-1.14 updated: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 7c5ddbd  [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
7c5ddbd is described below

commit 7c5ddbd201005e55ab68b4db7ee74c7cbeb13400
Author: lincoln lee <li...@gmail.com>
AuthorDate: Wed Dec 8 17:03:37 2021 +0800

    [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
    
    This closes #18048
---
 .../kafka/table/KafkaChangelogTableITCase.java     |   4 +
 .../nodes/physical/stream/StreamPhysicalSink.scala |  61 ++----
 .../FlinkChangelogModeInferenceProgram.scala       | 111 ++++++++--
 .../table/planner/plan/stream/sql/RankTest.xml     | 225 ++++++++++++++-------
 .../planner/plan/stream/sql/agg/AggregateTest.xml  |  75 +++++++
 .../planner/plan/stream/sql/join/JoinTest.xml      | 135 +++++++++++++
 .../table/planner/plan/stream/sql/RankTest.scala   |  92 ++++++++-
 .../plan/stream/sql/agg/AggregateTest.scala        |  66 ++++++
 .../planner/plan/stream/sql/join/JoinTest.scala    | 160 +++++++++++++++
 9 files changed, 795 insertions(+), 134 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 501d470..62a7d63 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -302,10 +302,12 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase {
 
         List<String> expected =
                 Arrays.asList(
+                        "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, 12-pack drill bits]",
                         "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, spare tire]",
                         "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, 2020-05-13T12:39:06, hammer]",
                         "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, 2020-05-13T12:39:09, rocks]",
                         "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, 2020-05-13T12:39:18, jacket]",
+                        "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, car battery]",
                         "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, scooter]");
 
         waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
@@ -435,10 +437,12 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase {
 
         List<String> expected =
                 Arrays.asList(
+                        "+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:43, 12-pack drill bits]",
                         "+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:43, spare tire]",
                         "+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:53, hammer]",
                         "+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:57, rocks]",
                         "+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:06, jacket]",
+                        "+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:28, car battery]",
                         "+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:28, scooter]");
 
         waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index 58d540e..4c0984b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -18,31 +18,23 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
-import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
 import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
 import org.apache.flink.table.connector.sink.DynamicTableSink
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.calcite.Sink
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, FlinkRelOptUtil, RelDescriptionWriterImpl}
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
-import org.apache.flink.types.RowKind
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.util.ImmutableBitSet
 
 import java.io.{PrintWriter, StringWriter}
 import java.util
 
-import scala.collection.JavaConversions._
-
 /**
  * Stream physical RelNode to to write data into an external sink defined by a
  * [[DynamicTableSink]].
@@ -55,7 +47,8 @@ class StreamPhysicalSink(
     tableIdentifier: ObjectIdentifier,
     catalogTable: ResolvedCatalogTable,
     tableSink: DynamicTableSink,
-    abilitySpecs: Array[SinkAbilitySpec])
+    abilitySpecs: Array[SinkAbilitySpec],
+    upsertMaterialize: Boolean = false)
   extends Sink(cluster, traitSet, inputRel, hints, tableIdentifier, catalogTable, tableSink)
   with StreamPhysicalRel {
 
@@ -70,7 +63,21 @@ class StreamPhysicalSink(
       tableIdentifier,
       catalogTable,
       tableSink,
-      abilitySpecs)
+      abilitySpecs,
+      upsertMaterialize)
+  }
+
+  def copy(newUpsertMaterialize: Boolean): StreamPhysicalSink = {
+    new StreamPhysicalSink(
+      cluster,
+      traitSet,
+      inputRel,
+      hints,
+      tableIdentifier,
+      catalogTable,
+      tableSink,
+      abilitySpecs,
+      newUpsertMaterialize)
   }
 
   override def translateToExecNode(): ExecNode[_] = {
@@ -83,40 +90,6 @@ class StreamPhysicalSink(
     tableSinkSpec.setTableSink(tableSink)
     val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
     tableSinkSpec.setReadableConfig(tableConfig.getConfiguration)
-
-    val primaryKeys = toScala(catalogTable.getResolvedSchema
-        .getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
-
-    val upsertMaterialize = tableConfig.getConfiguration.get(
-      ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
-      case UpsertMaterialize.FORCE => primaryKeys.nonEmpty
-      case UpsertMaterialize.NONE => false
-      case UpsertMaterialize.AUTO =>
-        val insertOnly = tableSink
-            .getChangelogMode(inputChangelogMode)
-            .containsOnly(RowKind.INSERT)
-
-        if (!insertOnly && primaryKeys.nonEmpty) {
-          val columnNames = catalogTable.getResolvedSchema.getColumnNames
-          val pks = ImmutableBitSet.of(primaryKeys.map(columnNames.indexOf): _*)
-
-          val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
-          val uniqueKeys = fmq.getUniqueKeys(getInput)
-          val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
-
-          if (uniqueKeys != null &&
-              uniqueKeys.exists(pks.contains) &&
-              !(changeLogUpsertKeys != null &&
-                  changeLogUpsertKeys.exists(pks.contains))) {
-            true
-          } else {
-            false
-          }
-        } else {
-          false
-        }
-    }
-
     new StreamExecSink(
       tableSinkSpec,
       inputChangelogMode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index e0e9a0f..30b1432 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -19,15 +19,21 @@
 package org.apache.flink.table.planner.plan.optimize.program
 
 import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
 import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.planner.calcite.FlinkContext
 import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{BEFORE_AND_AFTER, ONLY_UPDATE_AFTER, beforeAfterOrNone, onlyAfterOrNone}
 import org.apache.flink.table.planner.plan.`trait`._
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
 import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy}
 import org.apache.flink.table.planner.plan.utils._
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType
 import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, StreamTableSink, UpsertStreamTableSink}
+import org.apache.flink.types.RowKind
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.util.ImmutableBitSet
@@ -45,7 +51,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
   override def optimize(
       root: RelNode,
       context: StreamOptimizeContext): RelNode = {
-
     // step1: satisfy ModifyKindSet trait
     val physicalRoot = root.asInstanceOf[StreamPhysicalRel]
     val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR.visit(
@@ -428,19 +433,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
         rel: StreamPhysicalRel,
         requiredTrait: UpdateKindTrait): Option[StreamPhysicalRel] = rel match {
       case sink: StreamPhysicalSink =>
-        val childModifyKindSet = getModifyKindSet(sink.getInput)
-        val onlyAfter = onlyAfterOrNone(childModifyKindSet)
-        val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
-        val sinkTrait = UpdateKindTrait.fromChangelogMode(
-          sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
-        val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
-          Seq(onlyAfter, beforeAndAfter)
-        } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
-          Seq(beforeAndAfter)
-        } else {
-          Seq(UpdateKindTrait.NONE)
-        }
-        visitSink(sink, sinkRequiredTraits)
+        val sinkRequiredTraits = inferSinkRequiredTraits(sink)
+        val upsertMaterialize = analyzeUpsertMaterializeStrategy(sink)
+        visitSink(sink.copy(upsertMaterialize), sinkRequiredTraits)
 
       case sink: StreamPhysicalLegacySink[_] =>
         val childModifyKindSet = getModifyKindSet(sink.getInput)
@@ -759,6 +754,94 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
         Some(sink.copy(sinkTrait, children.head).asInstanceOf[StreamPhysicalRel])
       }
     }
+
+    /**
+     * Infer sink required traits by the sink node and its input. Sink required traits is based on
+     * the sink node's changelog mode, the only exception is when sink's pk(s) not exactly the same
+     * as the changeLogUpsertKeys and sink' changelog mode is ONLY_UPDATE_AFTER.
+     */
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): Seq[UpdateKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+      val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+      val sinkTrait = UpdateKindTrait.fromChangelogMode(
+        sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+      val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+        // if sink's pk(s) are not exactly match input changeLogUpsertKeys then it will fallback
+        // to beforeAndAfter mode for the correctness
+        var requireBeforeAndAfter: Boolean = false
+        val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+            .getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+
+        if (sinkDefinedPks.nonEmpty) {
+          val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+          val sinkPks = ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+          val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+          // if input is UA only, primary key != upsert key (upsert key can be null) we should
+          // fallback to beforeAndAfter.
+          // Notice: even sink pk(s) contains input upsert key we cannot optimize to UA only,
+          // this differs from batch job's unique key inference
+          if (changeLogUpsertKeys == null || !changeLogUpsertKeys.exists {_.equals(sinkPks)}) {
+            requireBeforeAndAfter = true
+          }
+        }
+        if (requireBeforeAndAfter) {
+          Seq(beforeAndAfter)
+        } else {
+          Seq(onlyAfter, beforeAndAfter)
+        }
+      } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
+        Seq(beforeAndAfter)
+      } else {
+        Seq(UpdateKindTrait.NONE)
+      }
+      sinkRequiredTraits
+    }
+
+    /**
+     *  Analyze whether to enable upsertMaterialize or not. In these case will return true:
+     *  1. when `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key nonempty.
+     *  2. when `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to AUTO and sink's primary key doesn't
+     *  contain upsertKeys of the input update stream.
+     */
+    private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink): Boolean = {
+      val tableConfig = sink.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext])
+          .getTableConfig
+      val inputChangelogMode = ChangelogPlanUtils.getChangelogMode(
+        sink.getInput.asInstanceOf[StreamPhysicalRel]).get
+      val catalogTable = sink.catalogTable
+      val primaryKeys = toScala(catalogTable.getResolvedSchema
+          .getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+      val upsertMaterialize = tableConfig.getConfiguration.get(
+        ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
+        case UpsertMaterialize.FORCE => primaryKeys.nonEmpty
+        case UpsertMaterialize.NONE => false
+        case UpsertMaterialize.AUTO =>
+          val sinkAcceptInsertOnly = sink.tableSink.getChangelogMode(inputChangelogMode)
+              .containsOnly(RowKind.INSERT)
+          val inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT)
+
+          if (!sinkAcceptInsertOnly && !inputInsertOnly && primaryKeys.nonEmpty) {
+            val columnNames = catalogTable.getResolvedSchema.getColumnNames
+            val pks = ImmutableBitSet.of(primaryKeys.map(columnNames.indexOf): _*)
+            val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+            val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+            // if input has update and primary key != upsert key (upsert key can be null) we should
+            // enable upsertMaterialize. An optimize is: do not enable upsertMaterialize when sink
+            // pk(s) contains input changeLogUpsertKeys
+            if (changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(pks.contains)) {
+              true
+            } else {
+              false
+            }
+          } else {
+            false
+          }
+      }
+      upsertMaterialize
+    }
   }
 
   // -------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 5b0f3d9..1b9c826 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -224,6 +224,83 @@ Calc(select=[a, rk, b, c])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testRankOutputLostUpsertKeyWithSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, c, rn])
++- LogicalProject(a=[$0], c=[$2], rn=[$5])
+   +- LogicalFilter(condition=[<=($5, 100)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rn=[ROW_NUMBER() OVER (PARTITION BY $1 ORDER BY $2 DESC NULLS LAST)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, c, w0$o0], changelogMode=[NONE])
++- Calc(select=[a, c, w0$o0], changelogMode=[I,UB,UA,D])
+   +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[b], orderBy=[c DESC], select=[a, b, c, w0$o0], changelogMode=[I,UB,UA,D])
+      +- Exchange(distribution=[hash[b]], changelogMode=[I])
+         +- Calc(select=[a, b, c], changelogMode=[I])
+            +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, c, w0$o0], upsertMaterialize=[true])
++- Calc(select=[a, c, w0$o0])
+   +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[b], orderBy=[c DESC], select=[a, b, c, w0$o0])
+      +- Exchange(distribution=[hash[b]])
+         +- Calc(select=[a, b, c])
+            +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRankOutputUpsertKeyInSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalFilter(condition=[<=($5, 100)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c], changelogMode=[NONE])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[a], orderBy=[c DESC], select=[a, b, c], changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[a]], changelogMode=[I])
+      +- Calc(select=[a, b, c], changelogMode=[I])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c], upsertMaterialize=[true])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[a], orderBy=[c DESC], select=[a, b, c])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRankOutputUpsertKeyNotMatchSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalFilter(condition=[<=($5, 100)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rn=[ROW_NUMBER() OVER (PARTITION BY $1 ORDER BY $2 DESC NULLS LAST)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c], changelogMode=[NONE])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[b], orderBy=[c DESC], select=[a, b, c], changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[b]], changelogMode=[I])
+      +- Calc(select=[a, b, c], changelogMode=[I])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c], upsertMaterialize=[true])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[b], orderBy=[c DESC], select=[a, b, c])
+   +- Exchange(distribution=[hash[b]])
+      +- Calc(select=[a, b, c])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testRankWithAnotherRankAsInput">
     <Resource name="sql">
       <![CDATA[
@@ -925,6 +1002,45 @@ Calc(select=[a, b, c, 10:BIGINT AS $3], changelogMode=[I,UA,D])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testTopNWithGroupByConstantKey">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM (
+  SELECT a, b, count_c,
+      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
+  FROM (
+SELECT a, b, COUNT(*) AS count_c
+FROM (
+SELECT *, 'cn' AS cn
+FROM MyTable
+)
+GROUP BY a, b, cn
+      ))
+WHERE row_num <= 10
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
++- LogicalFilter(condition=[<=($3, 10)])
+   +- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
+      +- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
+         +- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
+            +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0], changelogMode=[I,UA,D])
++- Exchange(distribution=[hash[a]], changelogMode=[I,UA])
+   +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], changelogMode=[I,UA])
+      +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+         +- Calc(select=[a, b], changelogMode=[I])
+            +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testTopNWithKeyChanged">
     <Resource name="sql">
       <![CDATA[
@@ -992,45 +1108,6 @@ Calc(select=[row_num, a, c], where=[IS NOT NULL(b)], changelogMode=[I,UB,UA,D])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTopNWithGroupByConstantKey">
-    <Resource name="sql">
-      <![CDATA[
-SELECT *
-FROM (
-  SELECT a, b, count_c,
-      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
-  FROM (
-SELECT a, b, COUNT(*) AS count_c
-FROM (
-SELECT *, 'cn' AS cn
-FROM MyTable
-)
-GROUP BY a, b, cn
-      ))
-WHERE row_num <= 10
-      ]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
-+- LogicalFilter(condition=[<=($3, 10)])
-   +- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
-      +- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
-         +- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
-            +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[a]], changelogMode=[I,UA])
-   +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], changelogMode=[I,UA])
-      +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
-         +- Calc(select=[a, b], changelogMode=[I])
-            +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testUnarySortTopNOnString">
     <Resource name="sql">
       <![CDATA[
@@ -1065,41 +1142,6 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testUpdatableRankAfterIntermediateScan">
-    <Resource name="ast">
-      <![CDATA[
-LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
-   +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2])
-         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-
-LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
-   +- LogicalFilter(condition=[<($3, 3)])
-      +- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
-         +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
-            +- LogicalProject(a=[$0], b=[$1], c=[$2])
-               +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
-GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, MIN(c) AS c])(reuse_id=[1])
-+- Exchange(distribution=[hash[a]])
-   +- Calc(select=[a, b, c])
-      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
-
-Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
-+- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
-+- Rank(strategy=[UpdateFastStrategy[0]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[b DESC], select=[a, b, c])
-   +- Exchange(distribution=[hash[a]])
-      +- Reused(reference_id=[1])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testUpdatableRankWithDeduplicate">
     <Resource name="sql">
       <![CDATA[
@@ -1137,4 +1179,39 @@ Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankS
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testUpdatableRankAfterIntermediateScan">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2])
+         +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+   +- LogicalFilter(condition=[<($3, 3)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
+         +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2])
+               +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, MIN(c) AS c])(reuse_id=[1])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, b, c])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- Rank(strategy=[UpdateFastStrategy[0]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[b DESC], select=[a, b, c])
+   +- Exchange(distribution=[hash[a]])
+      +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index c78b886..ebc6f06 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -316,6 +316,81 @@ Calc(select=[EXPR$0])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testGroupKeyInSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, cnt])
++- LogicalAggregate(group=[{0}], b=[MAX($1)], cnt=[COUNT()])
+   +- LogicalProject(a=[$0], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, cnt], changelogMode=[NONE])
++- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, COUNT(*) AS cnt], changelogMode=[I,UB,UA])
+   +- Exchange(distribution=[hash[a]], changelogMode=[I])
+      +- Calc(select=[a, b], changelogMode=[I])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, cnt])
++- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, COUNT(*) AS cnt])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupKeyNotMatchSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[c, cnt])
++- LogicalAggregate(group=[{0}], cnt=[COUNT()])
+   +- LogicalProject(c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[c, cnt], changelogMode=[NONE])
++- GroupAggregate(groupBy=[c], select=[c, COUNT(*) AS cnt], changelogMode=[I,UB,UA])
+   +- Exchange(distribution=[hash[c]], changelogMode=[I])
+      +- Calc(select=[c], changelogMode=[I])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[c, cnt], upsertMaterialize=[true])
++- GroupAggregate(groupBy=[c], select=[c, COUNT(*) AS cnt])
+   +- Exchange(distribution=[hash[c]])
+      +- Calc(select=[c])
+         +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testGroupResultLostUpsertKeyWithSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[c, cnt])
++- LogicalProject(c=[$1], cnt=[$2])
+   +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()])
+      +- LogicalProject(a=[$0], c=[$2])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[c, cnt], changelogMode=[NONE])
++- Calc(select=[c, cnt], changelogMode=[I,UB,UA])
+   +- GroupAggregate(groupBy=[a, c], select=[a, c, COUNT(*) AS cnt], changelogMode=[I,UB,UA])
+      +- Exchange(distribution=[hash[a, c]], changelogMode=[I])
+         +- Calc(select=[a, c], changelogMode=[I])
+            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[c, cnt], upsertMaterialize=[true])
++- Calc(select=[c, cnt])
+   +- GroupAggregate(groupBy=[a, c], select=[a, c, COUNT(*) AS cnt])
+      +- Exchange(distribution=[hash[a, c]])
+         +- Calc(select=[a, c])
+            +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testLocalGlobalAggAfterUnion">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index 12dd065..a7ab182 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -466,6 +466,141 @@ Calc(select=[person, sum_votes, prize, age])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinOutputLostUpsertKeyWithSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[city_name, customer_cnt])
++- LogicalProject(city_name=[$3], customer_cnt=[$1])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      :- LogicalAggregate(group=[{0}], customer_cnt=[COUNT()])
+      :  +- LogicalProject(city_id=[$1])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, source_customer]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, source_city]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_name, customer_cnt], changelogMode=[NONE])
++- Calc(select=[city_name, customer_cnt], changelogMode=[I,UB,UA,D])
+   +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+      :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+      :  +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt], changelogMode=[I,UB,UA,D])
+      :     +- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+      :        +- Calc(select=[city_id], changelogMode=[I,UB,UA,D])
+      :           +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D])
+      :              +- Exchange(distribution=[hash[customer_id]], changelogMode=[I,UA,D])
+      :                 +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id], changelogMode=[I,UA,D])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+         +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+            +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+               +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[I,UA,D])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_name, customer_cnt], upsertMaterialize=[true])
++- Calc(select=[city_name, customer_cnt])
+   +- Join(joinType=[InnerJoin], where=[(city_id = id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+      :- Exchange(distribution=[hash[city_id]])
+      :  +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt])
+      :     +- Exchange(distribution=[hash[city_id]])
+      :        +- Calc(select=[city_id])
+      :           +- ChangelogNormalize(key=[customer_id])
+      :              +- Exchange(distribution=[hash[customer_id]])
+      :                 +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id])
+      +- Exchange(distribution=[hash[id]])
+         +- ChangelogNormalize(key=[id])
+            +- Exchange(distribution=[hash[id]])
+               +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinOutputUpsertKeyInSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt])
++- LogicalProject(city_id=[$0], city_name=[$3], customer_cnt=[$1])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      :- LogicalAggregate(group=[{0}], customer_cnt=[COUNT()])
+      :  +- LogicalProject(city_id=[$1])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, source_customer]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, source_city]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt], changelogMode=[NONE])
++- Calc(select=[city_id, city_name, customer_cnt], changelogMode=[I,UB,UA,D])
+   +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+      :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+      :  +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt], changelogMode=[I,UB,UA,D])
+      :     +- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+      :        +- Calc(select=[city_id], changelogMode=[I,UB,UA,D])
+      :           +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D])
+      :              +- Exchange(distribution=[hash[customer_id]], changelogMode=[I,UA,D])
+      :                 +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id], changelogMode=[I,UA,D])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+         +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+            +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+               +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[I,UA,D])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt])
++- Calc(select=[city_id, city_name, customer_cnt])
+   +- Join(joinType=[InnerJoin], where=[(city_id = id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+      :- Exchange(distribution=[hash[city_id]])
+      :  +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt])
+      :     +- Exchange(distribution=[hash[city_id]])
+      :        +- Calc(select=[city_id])
+      :           +- ChangelogNormalize(key=[customer_id])
+      :              +- Exchange(distribution=[hash[customer_id]])
+      :                 +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id])
+      +- Exchange(distribution=[hash[id]])
+         +- ChangelogNormalize(key=[id])
+            +- Exchange(distribution=[hash[id]])
+               +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinOutputUpsertKeyNotMatchSinkPk">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt])
++- LogicalProject(city_id=[$0], city_name=[$3], customer_cnt=[$1])
+   +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      :- LogicalAggregate(group=[{0}], customer_cnt=[COUNT()])
+      :  +- LogicalProject(city_id=[$1])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, source_customer]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, source_city]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt], changelogMode=[NONE])
++- Calc(select=[city_id, city_name, customer_cnt], changelogMode=[I,UB,UA,D])
+   +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+      :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+      :  +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt], changelogMode=[I,UB,UA,D])
+      :     +- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+      :        +- Calc(select=[city_id], changelogMode=[I,UB,UA,D])
+      :           +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D])
+      :              +- Exchange(distribution=[hash[customer_id]], changelogMode=[I,UA,D])
+      :                 +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id], changelogMode=[I,UA,D])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+         +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+            +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+               +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[I,UA,D])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt], upsertMaterialize=[true])
++- Calc(select=[city_id, city_name, customer_cnt])
+   +- Join(joinType=[InnerJoin], where=[(city_id = id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+      :- Exchange(distribution=[hash[city_id]])
+      :  +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt])
+      :     +- Exchange(distribution=[hash[city_id]])
+      :        +- Calc(select=[city_id])
+      :           +- ChangelogNormalize(key=[customer_id])
+      :              +- Exchange(distribution=[hash[customer_id]])
+      :                 +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id])
+      +- Exchange(distribution=[hash[id]])
+         +- ChangelogNormalize(key=[id])
+            +- Exchange(distribution=[hash[id]])
+               +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testJoinWithSort">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 3527d0b..3963652 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -19,10 +19,8 @@ package org.apache.flink.table.planner.plan.stream.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.internal.TableEnvironmentInternal
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.utils.TableTestBase
-import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
 
 import org.junit.Test
 
@@ -881,5 +879,95 @@ class RankTest extends TableTestBase {
     util.verifyExecPlan(stmtSet)
   }
 
+  @Test
+  def testRankOutputUpsertKeyNotMatchSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE sink (
+        | a INT,
+        | b VARCHAR,
+        | c BIGINT,
+        | PRIMARY KEY (a) NOT ENFORCED
+        |) WITH (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO sink
+        |SELECT a, b, c FROM (
+        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn
+        |  FROM MyTable
+        |  )
+        |WHERE rn <= 100
+        |""".stripMargin
+    // verify UB should reserve and add upsertMaterialize if rank outputs' upsert keys differs from
+    // sink's pks
+    util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testRankOutputUpsertKeyInSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE sink (
+        | a INT,
+        | b VARCHAR,
+        | c BIGINT,
+        | PRIMARY KEY (a, b) NOT ENFORCED
+        |) WITH (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO sink
+        |SELECT a, b, c FROM (
+        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC) AS rn
+        |  FROM MyTable
+        |  )
+        |WHERE rn <= 100
+        |""".stripMargin
+
+    // verify UB should reserve and no upsertMaterialize if rank outputs' upsert keys are subset of
+    // sink's pks
+    util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testRankOutputLostUpsertKeyWithSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE sink (
+        | a INT,
+        | c BIGINT,
+        | rn BIGINT,
+        | PRIMARY KEY (a) NOT ENFORCED
+        |) WITH (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO sink
+        |SELECT a, c, rn FROM (
+        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn
+        |  FROM MyTable
+        |  )
+        |WHERE rn <= 100
+        |""".stripMargin
+    // verify UB should reserve and add upsertMaterialize if rank outputs' lost upsert keys
+    util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
+  }
+
   // TODO add tests about multi-sinks and udf
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index 54a01be..883415d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -296,4 +296,70 @@ class AggregateTest extends TableTestBase {
          |) t
          |""".stripMargin)
   }
+
+  @Test
+  def testGroupKeyNotMatchSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE sink (
+        | id VARCHAR,
+        | cnt BIGINT,
+        | PRIMARY KEY (cnt) NOT ENFORCED
+        |) WITH (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+    util.verifyExplainInsert(
+      """
+        |INSERT INTO sink
+        |SELECT c, COUNT(*) cnt FROM T GROUP BY c
+        |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testGroupKeyInSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE sink (
+        | a INT,
+        | b BIGINT,
+        | cnt BIGINT,
+        | PRIMARY KEY (a, b) NOT ENFORCED
+        |) WITH (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+    util.verifyExplainInsert(
+      """
+        |INSERT INTO sink
+        |SELECT a, MAX(b) b, COUNT(*) cnt FROM T GROUP BY a
+        |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testGroupResultLostUpsertKeyWithSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |CREATE TABLE sink (
+        | id VARCHAR,
+        | cnt BIGINT,
+        | PRIMARY KEY (id) NOT ENFORCED
+        |) WITH (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    // verify UB should reserve and add upsertMaterialize if group results lost upsert keys
+    util.verifyExplainInsert(
+      """
+        |INSERT INTO sink
+        |SELECT c, COUNT(*) cnt FROM T GROUP BY a, c
+        |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index fd70df2..881667a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -333,4 +333,164 @@ class JoinTest extends TableTestBase {
         | WHERE T1.person = T2.person
         |""".stripMargin)
   }
+
+  @Test
+  def testJoinOutputUpsertKeyNotMatchSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |create table source_city (
+        | id varchar,
+        | city_name varchar,
+        | primary key (id) not enforced
+        |) with (
+        | 'connector' = 'values',
+        | 'changelog-mode' = 'I,UA,D'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |create table source_customer (
+        | customer_id varchar,
+        | city_id varchar,
+        | age int,
+        | gender varchar,
+        | update_time timestamp(3),
+        | primary key (customer_id) not enforced
+        |) with (
+        | 'connector' = 'values',
+        | 'changelog-mode' = 'I,UA,D'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |create table sink (
+        | city_id varchar,
+        | city_name varchar,
+        | customer_cnt bigint,
+        | primary key (city_name) not enforced
+        |) with (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    // verify UB should reserve and add upsertMaterialize if join outputs' upsert keys differs from
+    // sink's pks
+    util.verifyExplainInsert(
+      """
+        |insert into sink
+        |select t1.city_id, t2.city_name, t1.customer_cnt
+        | from (select city_id, count(*) customer_cnt from source_customer group by city_id) t1
+        | join source_city t2 on t1.city_id = t2.id
+        |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testJoinOutputUpsertKeyInSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |create table source_city (
+        | id varchar,
+        | city_name varchar,
+        | primary key (id) not enforced
+        |) with (
+        | 'connector' = 'values',
+        | 'changelog-mode' = 'I,UA,D'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |create table source_customer (
+        | customer_id varchar,
+        | city_id varchar,
+        | age int,
+        | gender varchar,
+        | update_time timestamp(3),
+        | primary key (customer_id) not enforced
+        |) with (
+        | 'connector' = 'values',
+        | 'changelog-mode' = 'I,UA,D'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |create table sink (
+        | city_id varchar,
+        | city_name varchar,
+        | customer_cnt bigint,
+        | primary key (city_id, city_name) not enforced
+        |) with (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    // verify UB should reserve and no upsertMaterialize if join outputs' upsert keys are subset of
+    // sink's pks
+    util.verifyExplainInsert(
+      """
+        |insert into sink
+        |select t1.city_id, t2.city_name, t1.customer_cnt
+        | from (select city_id, count(*) customer_cnt from source_customer group by city_id) t1
+        | join source_city t2 on t1.city_id = t2.id
+        |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testJoinOutputLostUpsertKeyWithSinkPk(): Unit = {
+    // test for FLINK-20370
+    util.tableEnv.executeSql(
+      """
+        |create table source_city (
+        | id varchar,
+        | city_name varchar,
+        | primary key (id) not enforced
+        |) with (
+        | 'connector' = 'values',
+        | 'changelog-mode' = 'I,UA,D'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |create table source_customer (
+        | customer_id varchar,
+        | city_id varchar,
+        | age int,
+        | gender varchar,
+        | update_time timestamp(3),
+        | primary key (customer_id) not enforced
+        |) with (
+        | 'connector' = 'values',
+        | 'changelog-mode' = 'I,UA,D'
+        |)
+        |""".stripMargin)
+
+    util.tableEnv.executeSql(
+      """
+        |create table sink (
+        | city_name varchar,
+        | customer_cnt bigint,
+        | primary key (city_name) not enforced
+        |) with (
+        | 'connector' = 'values'
+        | ,'sink-insert-only' = 'false'
+        |)
+        |""".stripMargin)
+
+    // verify UB should reserve and add upsertMaterialize if join outputs' lost upsert keys
+    util.verifyExplainInsert(
+      """
+        |insert into sink
+        |select t2.city_name, t1.customer_cnt
+        | from (select city_id, count(*) customer_cnt from source_customer group by city_id) t1
+        | join source_city t2 on t1.city_id = t2.id
+        |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+  }
 }