You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2021/12/08 09:04:09 UTC
[flink] branch release-1.14 updated: [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 7c5ddbd [FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
7c5ddbd is described below
commit 7c5ddbd201005e55ab68b4db7ee74c7cbeb13400
Author: lincoln lee <li...@gmail.com>
AuthorDate: Wed Dec 8 17:03:37 2021 +0800
[FLINK-20370][table] part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key
This closes #18048
---
.../kafka/table/KafkaChangelogTableITCase.java | 4 +
.../nodes/physical/stream/StreamPhysicalSink.scala | 61 ++----
.../FlinkChangelogModeInferenceProgram.scala | 111 ++++++++--
.../table/planner/plan/stream/sql/RankTest.xml | 225 ++++++++++++++-------
.../planner/plan/stream/sql/agg/AggregateTest.xml | 75 +++++++
.../planner/plan/stream/sql/join/JoinTest.xml | 135 +++++++++++++
.../table/planner/plan/stream/sql/RankTest.scala | 92 ++++++++-
.../plan/stream/sql/agg/AggregateTest.scala | 66 ++++++
.../planner/plan/stream/sql/join/JoinTest.scala | 160 +++++++++++++++
9 files changed, 795 insertions(+), 134 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
index 501d470..62a7d63 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
@@ -302,10 +302,12 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase {
List<String> expected =
Arrays.asList(
+ "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, 12-pack drill bits]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:38:35.477, 2020-05-13T12:38:35, spare tire]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:06.301, 2020-05-13T12:39:06, hammer]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:09.489, 2020-05-13T12:39:09, rocks]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:39:18.230, 2020-05-13T12:39:18, jacket]",
+ "+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, car battery]",
"+I[changelog_canal, inventory, products2, {name=12, weight=7, description=12, id=4}, [id], 2020-05-13T12:42:33.939, 2020-05-13T12:42:33, scooter]");
waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
@@ -435,10 +437,12 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase {
List<String> expected =
Arrays.asList(
+ "+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:43, 12-pack drill bits]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:43, spare tire]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:53, hammer]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:34:57, rocks]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:06, jacket]",
+ "+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:28, car battery]",
"+I[changelog_maxwell, test, product, null, 2020-08-06T03:35:28, scooter]");
waitingExpectedResults("sink", expected, Duration.ofSeconds(10));
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index 58d540e..4c0984b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -18,31 +18,23 @@
package org.apache.flink.table.planner.plan.nodes.physical.stream
-import org.apache.flink.table.api.config.ExecutionConfigOptions
-import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
import org.apache.flink.table.catalog.{ObjectIdentifier, ResolvedCatalogTable}
import org.apache.flink.table.connector.sink.DynamicTableSink
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
-import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.calcite.Sink
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink
import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, FlinkRelOptUtil, RelDescriptionWriterImpl}
-import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
-import org.apache.flink.types.RowKind
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.util.ImmutableBitSet
import java.io.{PrintWriter, StringWriter}
import java.util
-import scala.collection.JavaConversions._
-
/**
* Stream physical RelNode to to write data into an external sink defined by a
* [[DynamicTableSink]].
@@ -55,7 +47,8 @@ class StreamPhysicalSink(
tableIdentifier: ObjectIdentifier,
catalogTable: ResolvedCatalogTable,
tableSink: DynamicTableSink,
- abilitySpecs: Array[SinkAbilitySpec])
+ abilitySpecs: Array[SinkAbilitySpec],
+ upsertMaterialize: Boolean = false)
extends Sink(cluster, traitSet, inputRel, hints, tableIdentifier, catalogTable, tableSink)
with StreamPhysicalRel {
@@ -70,7 +63,21 @@ class StreamPhysicalSink(
tableIdentifier,
catalogTable,
tableSink,
- abilitySpecs)
+ abilitySpecs,
+ upsertMaterialize)
+ }
+
+ def copy(newUpsertMaterialize: Boolean): StreamPhysicalSink = {
+ new StreamPhysicalSink(
+ cluster,
+ traitSet,
+ inputRel,
+ hints,
+ tableIdentifier,
+ catalogTable,
+ tableSink,
+ abilitySpecs,
+ newUpsertMaterialize)
}
override def translateToExecNode(): ExecNode[_] = {
@@ -83,40 +90,6 @@ class StreamPhysicalSink(
tableSinkSpec.setTableSink(tableSink)
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
tableSinkSpec.setReadableConfig(tableConfig.getConfiguration)
-
- val primaryKeys = toScala(catalogTable.getResolvedSchema
- .getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
-
- val upsertMaterialize = tableConfig.getConfiguration.get(
- ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
- case UpsertMaterialize.FORCE => primaryKeys.nonEmpty
- case UpsertMaterialize.NONE => false
- case UpsertMaterialize.AUTO =>
- val insertOnly = tableSink
- .getChangelogMode(inputChangelogMode)
- .containsOnly(RowKind.INSERT)
-
- if (!insertOnly && primaryKeys.nonEmpty) {
- val columnNames = catalogTable.getResolvedSchema.getColumnNames
- val pks = ImmutableBitSet.of(primaryKeys.map(columnNames.indexOf): _*)
-
- val fmq = FlinkRelMetadataQuery.reuseOrCreate(getCluster.getMetadataQuery)
- val uniqueKeys = fmq.getUniqueKeys(getInput)
- val changeLogUpsertKeys = fmq.getUpsertKeys(getInput)
-
- if (uniqueKeys != null &&
- uniqueKeys.exists(pks.contains) &&
- !(changeLogUpsertKeys != null &&
- changeLogUpsertKeys.exists(pks.contains))) {
- true
- } else {
- false
- }
- } else {
- false
- }
- }
-
new StreamExecSink(
tableSinkSpec,
inputChangelogMode,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index e0e9a0f..30b1432 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -19,15 +19,21 @@
package org.apache.flink.table.planner.plan.optimize.program
import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.table.planner.calcite.FlinkContext
import org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{BEFORE_AND_AFTER, ONLY_UPDATE_AFTER, beforeAfterOrNone, onlyAfterOrNone}
import org.apache.flink.table.planner.plan.`trait`._
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.physical.stream._
import org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy, RetractStrategy, UpdateFastStrategy}
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.planner.sinks.DataStreamTableSink
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
import org.apache.flink.table.runtime.operators.join.FlinkJoinType
import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, StreamTableSink, UpsertStreamTableSink}
+import org.apache.flink.types.RowKind
import org.apache.calcite.rel.RelNode
import org.apache.calcite.util.ImmutableBitSet
@@ -45,7 +51,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
override def optimize(
root: RelNode,
context: StreamOptimizeContext): RelNode = {
-
// step1: satisfy ModifyKindSet trait
val physicalRoot = root.asInstanceOf[StreamPhysicalRel]
val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR.visit(
@@ -428,19 +433,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
rel: StreamPhysicalRel,
requiredTrait: UpdateKindTrait): Option[StreamPhysicalRel] = rel match {
case sink: StreamPhysicalSink =>
- val childModifyKindSet = getModifyKindSet(sink.getInput)
- val onlyAfter = onlyAfterOrNone(childModifyKindSet)
- val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
- val sinkTrait = UpdateKindTrait.fromChangelogMode(
- sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
- val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
- Seq(onlyAfter, beforeAndAfter)
- } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
- Seq(beforeAndAfter)
- } else {
- Seq(UpdateKindTrait.NONE)
- }
- visitSink(sink, sinkRequiredTraits)
+ val sinkRequiredTraits = inferSinkRequiredTraits(sink)
+ val upsertMaterialize = analyzeUpsertMaterializeStrategy(sink)
+ visitSink(sink.copy(upsertMaterialize), sinkRequiredTraits)
case sink: StreamPhysicalLegacySink[_] =>
val childModifyKindSet = getModifyKindSet(sink.getInput)
@@ -759,6 +754,94 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
Some(sink.copy(sinkTrait, children.head).asInstanceOf[StreamPhysicalRel])
}
}
+
+ /**
+ * Infer sink required traits by the sink node and its input. Sink required traits is based on
+ * the sink node's changelog mode, the only exception is when sink's pk(s) not exactly the same
+ * as the changeLogUpsertKeys and sink' changelog mode is ONLY_UPDATE_AFTER.
+ */
+ private def inferSinkRequiredTraits(sink: StreamPhysicalSink): Seq[UpdateKindTrait] = {
+ val childModifyKindSet = getModifyKindSet(sink.getInput)
+ val onlyAfter = onlyAfterOrNone(childModifyKindSet)
+ val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
+ val sinkTrait = UpdateKindTrait.fromChangelogMode(
+ sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode))
+
+ val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
+ // if sink's pk(s) are not exactly match input changeLogUpsertKeys then it will fallback
+ // to beforeAndAfter mode for the correctness
+ var requireBeforeAndAfter: Boolean = false
+ val sinkDefinedPks = toScala(sink.catalogTable.getResolvedSchema
+ .getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+
+ if (sinkDefinedPks.nonEmpty) {
+ val sinkColumns = sink.catalogTable.getResolvedSchema.getColumnNames
+ val sinkPks = ImmutableBitSet.of(sinkDefinedPks.map(sinkColumns.indexOf): _*)
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+ // if input is UA only, primary key != upsert key (upsert key can be null) we should
+ // fallback to beforeAndAfter.
+ // Notice: even sink pk(s) contains input upsert key we cannot optimize to UA only,
+ // this differs from batch job's unique key inference
+ if (changeLogUpsertKeys == null || !changeLogUpsertKeys.exists {_.equals(sinkPks)}) {
+ requireBeforeAndAfter = true
+ }
+ }
+ if (requireBeforeAndAfter) {
+ Seq(beforeAndAfter)
+ } else {
+ Seq(onlyAfter, beforeAndAfter)
+ }
+ } else if (sinkTrait.equals(BEFORE_AND_AFTER)){
+ Seq(beforeAndAfter)
+ } else {
+ Seq(UpdateKindTrait.NONE)
+ }
+ sinkRequiredTraits
+ }
+
+ /**
+ * Analyze whether to enable upsertMaterialize or not. In these case will return true:
+ * 1. when `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key nonempty.
+ * 2. when `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to AUTO and sink's primary key doesn't
+ * contain upsertKeys of the input update stream.
+ */
+ private def analyzeUpsertMaterializeStrategy(sink: StreamPhysicalSink): Boolean = {
+ val tableConfig = sink.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext])
+ .getTableConfig
+ val inputChangelogMode = ChangelogPlanUtils.getChangelogMode(
+ sink.getInput.asInstanceOf[StreamPhysicalRel]).get
+ val catalogTable = sink.catalogTable
+ val primaryKeys = toScala(catalogTable.getResolvedSchema
+ .getPrimaryKey).map(_.getColumns).map(toScala[String]).getOrElse(Seq())
+ val upsertMaterialize = tableConfig.getConfiguration.get(
+ ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
+ case UpsertMaterialize.FORCE => primaryKeys.nonEmpty
+ case UpsertMaterialize.NONE => false
+ case UpsertMaterialize.AUTO =>
+ val sinkAcceptInsertOnly = sink.tableSink.getChangelogMode(inputChangelogMode)
+ .containsOnly(RowKind.INSERT)
+ val inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT)
+
+ if (!sinkAcceptInsertOnly && !inputInsertOnly && primaryKeys.nonEmpty) {
+ val columnNames = catalogTable.getResolvedSchema.getColumnNames
+ val pks = ImmutableBitSet.of(primaryKeys.map(columnNames.indexOf): _*)
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+ val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+ // if input has update and primary key != upsert key (upsert key can be null) we should
+ // enable upsertMaterialize. An optimize is: do not enable upsertMaterialize when sink
+ // pk(s) contains input changeLogUpsertKeys
+ if (changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(pks.contains)) {
+ true
+ } else {
+ false
+ }
+ } else {
+ false
+ }
+ }
+ upsertMaterialize
+ }
}
// -------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 5b0f3d9..1b9c826 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -224,6 +224,83 @@ Calc(select=[a, rk, b, c])
]]>
</Resource>
</TestCase>
+ <TestCase name="testRankOutputLostUpsertKeyWithSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, c, rn])
++- LogicalProject(a=[$0], c=[$2], rn=[$5])
+ +- LogicalFilter(condition=[<=($5, 100)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rn=[ROW_NUMBER() OVER (PARTITION BY $1 ORDER BY $2 DESC NULLS LAST)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, c, w0$o0], changelogMode=[NONE])
++- Calc(select=[a, c, w0$o0], changelogMode=[I,UB,UA,D])
+ +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[b], orderBy=[c DESC], select=[a, b, c, w0$o0], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[b]], changelogMode=[I])
+ +- Calc(select=[a, b, c], changelogMode=[I])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, c, w0$o0], upsertMaterialize=[true])
++- Calc(select=[a, c, w0$o0])
+ +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[b], orderBy=[c DESC], select=[a, b, c, w0$o0])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[a, b, c])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRankOutputUpsertKeyInSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[<=($5, 100)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c], changelogMode=[NONE])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[a], orderBy=[c DESC], select=[a, b, c], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[a]], changelogMode=[I])
+ +- Calc(select=[a, b, c], changelogMode=[I])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c], upsertMaterialize=[true])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[a], orderBy=[c DESC], select=[a, b, c])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, b, c])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRankOutputUpsertKeyNotMatchSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[<=($5, 100)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], rn=[ROW_NUMBER() OVER (PARTITION BY $1 ORDER BY $2 DESC NULLS LAST)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c], changelogMode=[NONE])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[b], orderBy=[c DESC], select=[a, b, c], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[b]], changelogMode=[I])
+ +- Calc(select=[a, b, c], changelogMode=[I])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c], upsertMaterialize=[true])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[b], orderBy=[c DESC], select=[a, b, c])
+ +- Exchange(distribution=[hash[b]])
+ +- Calc(select=[a, b, c])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testRankWithAnotherRankAsInput">
<Resource name="sql">
<![CDATA[
@@ -925,6 +1002,45 @@ Calc(select=[a, b, c, 10:BIGINT AS $3], changelogMode=[I,UA,D])
]]>
</Resource>
</TestCase>
+ <TestCase name="testTopNWithGroupByConstantKey">
+ <Resource name="sql">
+ <![CDATA[
+SELECT *
+FROM (
+ SELECT a, b, count_c,
+ ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
+ FROM (
+SELECT a, b, COUNT(*) AS count_c
+FROM (
+SELECT *, 'cn' AS cn
+FROM MyTable
+)
+GROUP BY a, b, cn
+ ))
+WHERE row_num <= 10
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
++- LogicalFilter(condition=[<=($3, 10)])
+ +- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
+ +- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
+ +- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0], changelogMode=[I,UA,D])
++- Exchange(distribution=[hash[a]], changelogMode=[I,UA])
+ +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], changelogMode=[I,UA])
+ +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+ +- Calc(select=[a, b], changelogMode=[I])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testTopNWithKeyChanged">
<Resource name="sql">
<![CDATA[
@@ -992,45 +1108,6 @@ Calc(select=[row_num, a, c], where=[IS NOT NULL(b)], changelogMode=[I,UB,UA,D])
]]>
</Resource>
</TestCase>
- <TestCase name="testTopNWithGroupByConstantKey">
- <Resource name="sql">
- <![CDATA[
-SELECT *
-FROM (
- SELECT a, b, count_c,
- ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num
- FROM (
-SELECT a, b, COUNT(*) AS count_c
-FROM (
-SELECT *, 'cn' AS cn
-FROM MyTable
-)
-GROUP BY a, b, cn
- ))
-WHERE row_num <= 10
- ]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
-LogicalProject(a=[$0], b=[$1], count_c=[$2], row_num=[$3])
-+- LogicalFilter(condition=[<=($3, 10)])
- +- LogicalProject(a=[$0], b=[$1], count_c=[$3], row_num=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $3 DESC NULLS LAST)])
- +- LogicalAggregate(group=[{0, 1, 2}], count_c=[COUNT()])
- +- LogicalProject(a=[$0], b=[$1], cn=[_UTF-16LE'cn'])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="optimized rel plan">
- <![CDATA[
-Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[count_c DESC], select=[a, b, count_c, w0$o0], changelogMode=[I,UA,D])
-+- Exchange(distribution=[hash[a]], changelogMode=[I,UA])
- +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(*) AS count_c], changelogMode=[I,UA])
- +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
- +- Calc(select=[a, b], changelogMode=[I])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime], changelogMode=[I])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testUnarySortTopNOnString">
<Resource name="sql">
<![CDATA[
@@ -1065,41 +1142,6 @@ Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1,
]]>
</Resource>
</TestCase>
- <TestCase name="testUpdatableRankAfterIntermediateScan">
- <Resource name="ast">
- <![CDATA[
-LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-
-LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
-+- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalFilter(condition=[<($3, 3)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
- +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
- +- LogicalProject(a=[$0], b=[$1], c=[$2])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
-GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, MIN(c) AS c])(reuse_id=[1])
-+- Exchange(distribution=[hash[a]])
- +- Calc(select=[a, b, c])
- +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
-
-Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
-+- Reused(reference_id=[1])
-
-Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
-+- Rank(strategy=[UpdateFastStrategy[0]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[b DESC], select=[a, b, c])
- +- Exchange(distribution=[hash[a]])
- +- Reused(reference_id=[1])
-]]>
- </Resource>
- </TestCase>
<TestCase name="testUpdatableRankWithDeduplicate">
<Resource name="sql">
<![CDATA[
@@ -1137,4 +1179,39 @@ Rank(strategy=[UpdateFastStrategy[0,1]], rankType=[ROW_NUMBER], rankRange=[rankS
]]>
</Resource>
</TestCase>
+ <TestCase name="testUpdatableRankAfterIntermediateScan">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalFilter(condition=[<($3, 3)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
+ +- LogicalAggregate(group=[{0}], b=[MAX($1)], c=[MIN($2)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, MIN(c) AS c])(reuse_id=[1])
++- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, b, c])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- Reused(reference_id=[1])
+
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, c])
++- Rank(strategy=[UpdateFastStrategy[0]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=2], partitionBy=[a], orderBy=[b DESC], select=[a, b, c])
+ +- Exchange(distribution=[hash[a]])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index c78b886..ebc6f06 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -316,6 +316,81 @@ Calc(select=[EXPR$0])
]]>
</Resource>
</TestCase>
+ <TestCase name="testGroupKeyInSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[a, b, cnt])
++- LogicalAggregate(group=[{0}], b=[MAX($1)], cnt=[COUNT()])
+ +- LogicalProject(a=[$0], b=[$1])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, cnt], changelogMode=[NONE])
++- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, COUNT(*) AS cnt], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[a]], changelogMode=[I])
+ +- Calc(select=[a, b], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[a, b, cnt])
++- GroupAggregate(groupBy=[a], select=[a, MAX(b) AS b, COUNT(*) AS cnt])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, b])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupKeyNotMatchSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[c, cnt])
++- LogicalAggregate(group=[{0}], cnt=[COUNT()])
+ +- LogicalProject(c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[c, cnt], changelogMode=[NONE])
++- GroupAggregate(groupBy=[c], select=[c, COUNT(*) AS cnt], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[c]], changelogMode=[I])
+ +- Calc(select=[c], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[c, cnt], upsertMaterialize=[true])
++- GroupAggregate(groupBy=[c], select=[c, COUNT(*) AS cnt])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[c])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testGroupResultLostUpsertKeyWithSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[c, cnt])
++- LogicalProject(c=[$1], cnt=[$2])
+ +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT()])
+ +- LogicalProject(a=[$0], c=[$2])
+ +- LogicalTableScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[c, cnt], changelogMode=[NONE])
++- Calc(select=[c, cnt], changelogMode=[I,UB,UA])
+ +- GroupAggregate(groupBy=[a, c], select=[a, c, COUNT(*) AS cnt], changelogMode=[I,UB,UA])
+ +- Exchange(distribution=[hash[a, c]], changelogMode=[I])
+ +- Calc(select=[a, c], changelogMode=[I])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d], changelogMode=[I])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[c, cnt], upsertMaterialize=[true])
++- Calc(select=[c, cnt])
+ +- GroupAggregate(groupBy=[a, c], select=[a, c, COUNT(*) AS cnt])
+ +- Exchange(distribution=[hash[a, c]])
+ +- Calc(select=[a, c])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, T, source: [TestTableSource(a, b, c, d)]]], fields=[a, b, c, d])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testLocalGlobalAggAfterUnion">
<Resource name="sql">
<![CDATA[
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index 12dd065..a7ab182 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -466,6 +466,141 @@ Calc(select=[person, sum_votes, prize, age])
]]>
</Resource>
</TestCase>
+ <TestCase name="testJoinOutputLostUpsertKeyWithSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[city_name, customer_cnt])
++- LogicalProject(city_name=[$3], customer_cnt=[$1])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ :- LogicalAggregate(group=[{0}], customer_cnt=[COUNT()])
+ : +- LogicalProject(city_id=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, source_customer]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source_city]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_name, customer_cnt], changelogMode=[NONE])
++- Calc(select=[city_name, customer_cnt], changelogMode=[I,UB,UA,D])
+ +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+ :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+ : +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt], changelogMode=[I,UB,UA,D])
+ : +- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+ : +- Calc(select=[city_id], changelogMode=[I,UB,UA,D])
+ : +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D])
+ : +- Exchange(distribution=[hash[customer_id]], changelogMode=[I,UA,D])
+ : +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id], changelogMode=[I,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[I,UA,D])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_name, customer_cnt], upsertMaterialize=[true])
++- Calc(select=[city_name, customer_cnt])
+ +- Join(joinType=[InnerJoin], where=[(city_id = id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+ :- Exchange(distribution=[hash[city_id]])
+ : +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt])
+ : +- Exchange(distribution=[hash[city_id]])
+ : +- Calc(select=[city_id])
+ : +- ChangelogNormalize(key=[customer_id])
+ : +- Exchange(distribution=[hash[customer_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id])
+ +- Exchange(distribution=[hash[id]])
+ +- ChangelogNormalize(key=[id])
+ +- Exchange(distribution=[hash[id]])
+ +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinOutputUpsertKeyInSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt])
++- LogicalProject(city_id=[$0], city_name=[$3], customer_cnt=[$1])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ :- LogicalAggregate(group=[{0}], customer_cnt=[COUNT()])
+ : +- LogicalProject(city_id=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, source_customer]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source_city]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt], changelogMode=[NONE])
++- Calc(select=[city_id, city_name, customer_cnt], changelogMode=[I,UB,UA,D])
+ +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+ :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+ : +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt], changelogMode=[I,UB,UA,D])
+ : +- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+ : +- Calc(select=[city_id], changelogMode=[I,UB,UA,D])
+ : +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D])
+ : +- Exchange(distribution=[hash[customer_id]], changelogMode=[I,UA,D])
+ : +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id], changelogMode=[I,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[I,UA,D])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt])
++- Calc(select=[city_id, city_name, customer_cnt])
+ +- Join(joinType=[InnerJoin], where=[(city_id = id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+ :- Exchange(distribution=[hash[city_id]])
+ : +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt])
+ : +- Exchange(distribution=[hash[city_id]])
+ : +- Calc(select=[city_id])
+ : +- ChangelogNormalize(key=[customer_id])
+ : +- Exchange(distribution=[hash[customer_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id])
+ +- Exchange(distribution=[hash[id]])
+ +- ChangelogNormalize(key=[id])
+ +- Exchange(distribution=[hash[id]])
+ +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testJoinOutputUpsertKeyNotMatchSinkPk">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt])
++- LogicalProject(city_id=[$0], city_name=[$3], customer_cnt=[$1])
+ +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+ :- LogicalAggregate(group=[{0}], customer_cnt=[COUNT()])
+ : +- LogicalProject(city_id=[$1])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, source_customer]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, source_city]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt], changelogMode=[NONE])
++- Calc(select=[city_id, city_name, customer_cnt], changelogMode=[I,UB,UA,D])
+ +- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+ :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+ : +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt], changelogMode=[I,UB,UA,D])
+ : +- Exchange(distribution=[hash[city_id]], changelogMode=[I,UB,UA,D])
+ : +- Calc(select=[city_id], changelogMode=[I,UB,UA,D])
+ : +- ChangelogNormalize(key=[customer_id], changelogMode=[I,UB,UA,D])
+ : +- Exchange(distribution=[hash[customer_id]], changelogMode=[I,UA,D])
+ : +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id], changelogMode=[I,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+ +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name], changelogMode=[I,UA,D])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[city_id, city_name, customer_cnt], upsertMaterialize=[true])
++- Calc(select=[city_id, city_name, customer_cnt])
+ +- Join(joinType=[InnerJoin], where=[(city_id = id)], select=[city_id, customer_cnt, id, city_name], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+ :- Exchange(distribution=[hash[city_id]])
+ : +- GroupAggregate(groupBy=[city_id], select=[city_id, COUNT_RETRACT(*) AS customer_cnt])
+ : +- Exchange(distribution=[hash[city_id]])
+ : +- Calc(select=[city_id])
+ : +- ChangelogNormalize(key=[customer_id])
+ : +- Exchange(distribution=[hash[customer_id]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, source_customer, project=[city_id, customer_id], metadata=[]]], fields=[city_id, customer_id])
+ +- Exchange(distribution=[hash[id]])
+ +- ChangelogNormalize(key=[id])
+ +- Exchange(distribution=[hash[id]])
+ +- TableSourceScan(table=[[default_catalog, default_database, source_city]], fields=[id, city_name])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testJoinWithSort">
<Resource name="sql">
<![CDATA[
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
index 3527d0b..3963652 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/RankTest.scala
@@ -19,10 +19,8 @@ package org.apache.flink.table.planner.plan.stream.sql
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
-import org.apache.flink.table.api.internal.TableEnvironmentInternal
import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
import org.apache.flink.table.planner.utils.TableTestBase
-import org.apache.flink.table.types.logical.{BigIntType, IntType, VarCharType}
import org.junit.Test
@@ -881,5 +879,95 @@ class RankTest extends TableTestBase {
util.verifyExecPlan(stmtSet)
}
+ @Test
+ def testRankOutputUpsertKeyNotMatchSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |CREATE TABLE sink (
+ | a INT,
+ | b VARCHAR,
+ | c BIGINT,
+ | PRIMARY KEY (a) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ val sql =
+ """
+ |INSERT INTO sink
+ |SELECT a, b, c FROM (
+ | SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn
+ | FROM MyTable
+ | )
+ |WHERE rn <= 100
+ |""".stripMargin
+ // verify UB should reserve and add upsertMaterialize if rank outputs' upsert keys differs from
+ // sink's pks
+ util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testRankOutputUpsertKeyInSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |CREATE TABLE sink (
+ | a INT,
+ | b VARCHAR,
+ | c BIGINT,
+ | PRIMARY KEY (a, b) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ val sql =
+ """
+ |INSERT INTO sink
+ |SELECT a, b, c FROM (
+ | SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC) AS rn
+ | FROM MyTable
+ | )
+ |WHERE rn <= 100
+ |""".stripMargin
+
+ // verify UB should reserve and no upsertMaterialize if rank outputs' upsert keys are subset of
+ // sink's pks
+ util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testRankOutputLostUpsertKeyWithSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |CREATE TABLE sink (
+ | a INT,
+ | c BIGINT,
+ | rn BIGINT,
+ | PRIMARY KEY (a) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ val sql =
+ """
+ |INSERT INTO sink
+ |SELECT a, c, rn FROM (
+ | SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn
+ | FROM MyTable
+ | )
+ |WHERE rn <= 100
+ |""".stripMargin
+ // verify UB should reserve and add upsertMaterialize if rank outputs' lost upsert keys
+ util.verifyExplainInsert(sql, ExplainDetail.CHANGELOG_MODE)
+ }
+
// TODO add tests about multi-sinks and udf
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index 54a01be..883415d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -296,4 +296,70 @@ class AggregateTest extends TableTestBase {
|) t
|""".stripMargin)
}
+
+ @Test
+ def testGroupKeyNotMatchSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |CREATE TABLE sink (
+ | id VARCHAR,
+ | cnt BIGINT,
+ | PRIMARY KEY (cnt) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+ util.verifyExplainInsert(
+ """
+ |INSERT INTO sink
+ |SELECT c, COUNT(*) cnt FROM T GROUP BY c
+ |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testGroupKeyInSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |CREATE TABLE sink (
+ | a INT,
+ | b BIGINT,
+ | cnt BIGINT,
+ | PRIMARY KEY (a, b) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+ util.verifyExplainInsert(
+ """
+ |INSERT INTO sink
+ |SELECT a, MAX(b) b, COUNT(*) cnt FROM T GROUP BY a
+ |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testGroupResultLostUpsertKeyWithSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |CREATE TABLE sink (
+ | id VARCHAR,
+ | cnt BIGINT,
+ | PRIMARY KEY (id) NOT ENFORCED
+ |) WITH (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ // verify UB should reserve and add upsertMaterialize if group results lost upsert keys
+ util.verifyExplainInsert(
+ """
+ |INSERT INTO sink
+ |SELECT c, COUNT(*) cnt FROM T GROUP BY a, c
+ |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index fd70df2..881667a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -333,4 +333,164 @@ class JoinTest extends TableTestBase {
| WHERE T1.person = T2.person
|""".stripMargin)
}
+
+ @Test
+ def testJoinOutputUpsertKeyNotMatchSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |create table source_city (
+ | id varchar,
+ | city_name varchar,
+ | primary key (id) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |create table source_customer (
+ | customer_id varchar,
+ | city_id varchar,
+ | age int,
+ | gender varchar,
+ | update_time timestamp(3),
+ | primary key (customer_id) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |create table sink (
+ | city_id varchar,
+ | city_name varchar,
+ | customer_cnt bigint,
+ | primary key (city_name) not enforced
+ |) with (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ // verify UB should reserve and add upsertMaterialize if join outputs' upsert keys differs from
+ // sink's pks
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select t1.city_id, t2.city_name, t1.customer_cnt
+ | from (select city_id, count(*) customer_cnt from source_customer group by city_id) t1
+ | join source_city t2 on t1.city_id = t2.id
+ |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testJoinOutputUpsertKeyInSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |create table source_city (
+ | id varchar,
+ | city_name varchar,
+ | primary key (id) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |create table source_customer (
+ | customer_id varchar,
+ | city_id varchar,
+ | age int,
+ | gender varchar,
+ | update_time timestamp(3),
+ | primary key (customer_id) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |create table sink (
+ | city_id varchar,
+ | city_name varchar,
+ | customer_cnt bigint,
+ | primary key (city_id, city_name) not enforced
+ |) with (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ // verify UB should reserve and no upsertMaterialize if join outputs' upsert keys are subset of
+ // sink's pks
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select t1.city_id, t2.city_name, t1.customer_cnt
+ | from (select city_id, count(*) customer_cnt from source_customer group by city_id) t1
+ | join source_city t2 on t1.city_id = t2.id
+ |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+ }
+
+ @Test
+ def testJoinOutputLostUpsertKeyWithSinkPk(): Unit = {
+ // test for FLINK-20370
+ util.tableEnv.executeSql(
+ """
+ |create table source_city (
+ | id varchar,
+ | city_name varchar,
+ | primary key (id) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |create table source_customer (
+ | customer_id varchar,
+ | city_id varchar,
+ | age int,
+ | gender varchar,
+ | update_time timestamp(3),
+ | primary key (customer_id) not enforced
+ |) with (
+ | 'connector' = 'values',
+ | 'changelog-mode' = 'I,UA,D'
+ |)
+ |""".stripMargin)
+
+ util.tableEnv.executeSql(
+ """
+ |create table sink (
+ | city_name varchar,
+ | customer_cnt bigint,
+ | primary key (city_name) not enforced
+ |) with (
+ | 'connector' = 'values'
+ | ,'sink-insert-only' = 'false'
+ |)
+ |""".stripMargin)
+
+ // verify UB should reserve and add upsertMaterialize if join outputs' lost upsert keys
+ util.verifyExplainInsert(
+ """
+ |insert into sink
+ |select t2.city_name, t1.customer_cnt
+ | from (select city_id, count(*) customer_cnt from source_customer group by city_id) t1
+ | join source_city t2 on t1.city_id = t2.id
+ |""".stripMargin, ExplainDetail.CHANGELOG_MODE)
+ }
}