You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/09/09 02:45:35 UTC
[flink] branch master updated: [FLINK-28787][table-planner] Rename getUniqueKeys to getUpsertKeys in CommonPhysicalJoin
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 347316ea639 [FLINK-28787][table-planner] Rename getUniqueKeys to getUpsertKeys in CommonPhysicalJoin
347316ea639 is described below
commit 347316ea6394b24c4471aa8616f2632e126f733d
Author: lincoln lee <li...@gmail.com>
AuthorDate: Tue Sep 6 16:58:08 2022 +0800
[FLINK-28787][table-planner] Rename getUniqueKeys to getUpsertKeys in CommonPhysicalJoin
This closes #20763
---
.../plan/nodes/exec/stream/StreamExecJoin.java | 35 ++++++++++++----------
.../StreamNonDeterministicUpdatePlanVisitor.java | 4 +--
.../nodes/physical/common/CommonPhysicalJoin.scala | 3 +-
.../nodes/physical/stream/StreamPhysicalJoin.scala | 10 +++----
.../JoinJsonPlanTest_jsonplan/testInnerJoin.out | 2 --
.../testInnerJoinWithEqualPk.out | 4 +--
.../testInnerJoinWithPk.out | 4 +--
.../testLeftJoinNonEqui.out | 2 --
8 files changed, 31 insertions(+), 33 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index b8a45b8d6f1..dbf399d8ae5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
@@ -73,23 +74,25 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
public static final String JOIN_TRANSFORMATION = "join";
public static final String FIELD_NAME_JOIN_SPEC = "joinSpec";
- public static final String FIELD_NAME_LEFT_UNIQUE_KEYS = "leftUniqueKeys";
- public static final String FIELD_NAME_RIGHT_UNIQUE_KEYS = "rightUniqueKeys";
+ public static final String FIELD_NAME_LEFT_UPSERT_KEYS = "leftUpsertKeys";
+ public static final String FIELD_NAME_RIGHT_UPSERT_KEYS = "rightUpsertKeys";
@JsonProperty(FIELD_NAME_JOIN_SPEC)
private final JoinSpec joinSpec;
- @JsonProperty(FIELD_NAME_LEFT_UNIQUE_KEYS)
- private final List<int[]> leftUniqueKeys;
+ @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS)
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ private final List<int[]> leftUpsertKeys;
- @JsonProperty(FIELD_NAME_RIGHT_UNIQUE_KEYS)
- private final List<int[]> rightUniqueKeys;
+ @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS)
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ private final List<int[]> rightUpsertKeys;
public StreamExecJoin(
ReadableConfig tableConfig,
JoinSpec joinSpec,
- List<int[]> leftUniqueKeys,
- List<int[]> rightUniqueKeys,
+ List<int[]> leftUpsertKeys,
+ List<int[]> rightUpsertKeys,
InputProperty leftInputProperty,
InputProperty rightInputProperty,
RowType outputType,
@@ -99,8 +102,8 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
ExecNodeContext.newContext(StreamExecJoin.class),
ExecNodeContext.newPersistedConfig(StreamExecJoin.class, tableConfig),
joinSpec,
- leftUniqueKeys,
- rightUniqueKeys,
+ leftUpsertKeys,
+ rightUpsertKeys,
Lists.newArrayList(leftInputProperty, rightInputProperty),
outputType,
description);
@@ -112,16 +115,16 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
@JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
@JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec,
- @JsonProperty(FIELD_NAME_LEFT_UNIQUE_KEYS) List<int[]> leftUniqueKeys,
- @JsonProperty(FIELD_NAME_RIGHT_UNIQUE_KEYS) List<int[]> rightUniqueKeys,
+ @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]> leftUpsertKeys,
+ @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]> rightUpsertKeys,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, context, persistedConfig, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 2);
this.joinSpec = checkNotNull(joinSpec);
- this.leftUniqueKeys = leftUniqueKeys;
- this.rightUniqueKeys = rightUniqueKeys;
+ this.leftUpsertKeys = leftUpsertKeys;
+ this.rightUpsertKeys = rightUpsertKeys;
}
@Override
@@ -149,7 +152,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
planner.getFlinkContext().getClassLoader(),
leftTypeInfo,
leftJoinKey,
- leftUniqueKeys);
+ leftUpsertKeys);
final InternalTypeInfo<RowData> rightTypeInfo = InternalTypeInfo.of(rightType);
final JoinInputSideSpec rightInputSpec =
@@ -157,7 +160,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
planner.getFlinkContext().getClassLoader(),
rightTypeInfo,
rightJoinKey,
- rightUniqueKeys);
+ rightUpsertKeys);
GeneratedJoinCondition generatedCondition =
JoinUtil.generateConditionFunction(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
index 2caa2bb951b..9aff0a7a014 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
@@ -498,7 +498,7 @@ public class StreamNonDeterministicUpdatePlanVisitor {
join.joinSpec().getLeftKeys(),
// TODO remove this conversion when scala-free was total done.
scala.collection.JavaConverters.seqAsJavaList(
- join.getUniqueKeys(leftRel, join.joinSpec().getLeftKeys())));
+ join.getUpsertKeys(leftRel, join.joinSpec().getLeftKeys())));
StreamPhysicalRel newRight =
visitJoinChild(
requireDeterminism,
@@ -509,7 +509,7 @@ public class StreamNonDeterministicUpdatePlanVisitor {
join.joinSpec().getRightKeys(),
// TODO remove this conversion when scala-free was total done.
scala.collection.JavaConverters.seqAsJavaList(
- join.getUniqueKeys(rightRel, join.joinSpec().getRightKeys())));
+ join.getUpsertKeys(rightRel, join.joinSpec().getRightKeys())));
return (StreamPhysicalRel)
join.copy(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala
index bee5b1b00f2..871eccf00ae 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala
@@ -81,8 +81,7 @@ abstract class CommonPhysicalJoin(
.item("select", getRowType.getFieldNames.mkString(", "))
}
- def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = {
- // TODO update method name in FLINK-28787
+ def getUpsertKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = {
val upsertKeys = FlinkRelMetadataQuery
.reuseOrCreate(cluster.getMetadataQuery)
.getUpsertKeysInKeyGroupRange(input, keys)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
index ed3e40b52ca..60eb40d14d9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
@@ -67,7 +67,7 @@ class StreamPhysicalJoin(
def inputUniqueKeyContainsJoinKey(inputOrdinal: Int): Boolean = {
val input = getInput(inputOrdinal)
val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else joinSpec.getRightKeys
- val inputUniqueKeys = getUniqueKeys(input, joinKeys)
+ val inputUniqueKeys = getUpsertKeys(input, joinKeys)
if (inputUniqueKeys != null) {
inputUniqueKeys.exists(uniqueKey => joinKeys.forall(uniqueKey.contains(_)))
} else {
@@ -96,7 +96,7 @@ class StreamPhysicalJoin(
unwrapClassLoader(left),
InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(left.getRowType)),
joinSpec.getLeftKeys,
- getUniqueKeys(left, joinSpec.getLeftKeys)
+ getUpsertKeys(left, joinSpec.getLeftKeys)
)
)
.item(
@@ -105,7 +105,7 @@ class StreamPhysicalJoin(
unwrapClassLoader(right),
InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(right.getRowType)),
joinSpec.getRightKeys,
- getUniqueKeys(right, joinSpec.getRightKeys)
+ getUpsertKeys(right, joinSpec.getRightKeys)
)
)
}
@@ -119,8 +119,8 @@ class StreamPhysicalJoin(
new StreamExecJoin(
unwrapTableConfig(this),
joinSpec,
- getUniqueKeys(left, joinSpec.getLeftKeys),
- getUniqueKeys(right, joinSpec.getRightKeys),
+ getUpsertKeys(left, joinSpec.getLeftKeys),
+ getUpsertKeys(right, joinSpec.getRightKeys),
InputProperty.DEFAULT,
InputProperty.DEFAULT,
FlinkTypeFactory.toLogicalRowType(getRowType),
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
index 0355fb03137..243f4d203ec 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
@@ -116,8 +116,6 @@
"filterNulls" : [ true ],
"nonEquiCondition" : null
},
- "leftUniqueKeys" : [ ],
- "rightUniqueKeys" : [ ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
index 5d80fb2baac..141b22ea160 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
@@ -184,8 +184,8 @@
"filterNulls" : [ true ],
"nonEquiCondition" : null
},
- "leftUniqueKeys" : [ [ 0 ] ],
- "rightUniqueKeys" : [ [ 0 ] ],
+ "leftUpsertKeys" : [ [ 0 ] ],
+ "rightUpsertKeys" : [ [ 0 ] ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
index 149408c5e1c..b0980c85f52 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
@@ -246,8 +246,8 @@
"filterNulls" : [ true ],
"nonEquiCondition" : null
},
- "leftUniqueKeys" : [ [ 1 ] ],
- "rightUniqueKeys" : [ [ 1 ] ],
+ "leftUpsertKeys" : [ [ 1 ] ],
+ "rightUpsertKeys" : [ [ 1 ] ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
index 27df7d82b1c..27ad53723c9 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
@@ -130,8 +130,6 @@
"type" : "BOOLEAN"
}
},
- "leftUniqueKeys" : [ ],
- "rightUniqueKeys" : [ ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"