You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/09/09 02:45:35 UTC

[flink] branch master updated: [FLINK-28787][table-planner] Rename getUniqueKeys to getUpsertKeys in CommonPhysicalJoin

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 347316ea639 [FLINK-28787][table-planner] Rename getUniqueKeys to getUpsertKeys in CommonPhysicalJoin
347316ea639 is described below

commit 347316ea6394b24c4471aa8616f2632e126f733d
Author: lincoln lee <li...@gmail.com>
AuthorDate: Tue Sep 6 16:58:08 2022 +0800

    [FLINK-28787][table-planner] Rename getUniqueKeys to getUpsertKeys in CommonPhysicalJoin
    
    This closes #20763
---
 .../plan/nodes/exec/stream/StreamExecJoin.java     | 35 ++++++++++++----------
 .../StreamNonDeterministicUpdatePlanVisitor.java   |  4 +--
 .../nodes/physical/common/CommonPhysicalJoin.scala |  3 +-
 .../nodes/physical/stream/StreamPhysicalJoin.scala | 10 +++----
 .../JoinJsonPlanTest_jsonplan/testInnerJoin.out    |  2 --
 .../testInnerJoinWithEqualPk.out                   |  4 +--
 .../testInnerJoinWithPk.out                        |  4 +--
 .../testLeftJoinNonEqui.out                        |  2 --
 8 files changed, 31 insertions(+), 33 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index b8a45b8d6f1..dbf399d8ae5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.List;
@@ -73,23 +74,25 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
     public static final String JOIN_TRANSFORMATION = "join";
 
     public static final String FIELD_NAME_JOIN_SPEC = "joinSpec";
-    public static final String FIELD_NAME_LEFT_UNIQUE_KEYS = "leftUniqueKeys";
-    public static final String FIELD_NAME_RIGHT_UNIQUE_KEYS = "rightUniqueKeys";
+    public static final String FIELD_NAME_LEFT_UPSERT_KEYS = "leftUpsertKeys";
+    public static final String FIELD_NAME_RIGHT_UPSERT_KEYS = "rightUpsertKeys";
 
     @JsonProperty(FIELD_NAME_JOIN_SPEC)
     private final JoinSpec joinSpec;
 
-    @JsonProperty(FIELD_NAME_LEFT_UNIQUE_KEYS)
-    private final List<int[]> leftUniqueKeys;
+    @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS)
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    private final List<int[]> leftUpsertKeys;
 
-    @JsonProperty(FIELD_NAME_RIGHT_UNIQUE_KEYS)
-    private final List<int[]> rightUniqueKeys;
+    @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS)
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    private final List<int[]> rightUpsertKeys;
 
     public StreamExecJoin(
             ReadableConfig tableConfig,
             JoinSpec joinSpec,
-            List<int[]> leftUniqueKeys,
-            List<int[]> rightUniqueKeys,
+            List<int[]> leftUpsertKeys,
+            List<int[]> rightUpsertKeys,
             InputProperty leftInputProperty,
             InputProperty rightInputProperty,
             RowType outputType,
@@ -99,8 +102,8 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
                 ExecNodeContext.newContext(StreamExecJoin.class),
                 ExecNodeContext.newPersistedConfig(StreamExecJoin.class, tableConfig),
                 joinSpec,
-                leftUniqueKeys,
-                rightUniqueKeys,
+                leftUpsertKeys,
+                rightUpsertKeys,
                 Lists.newArrayList(leftInputProperty, rightInputProperty),
                 outputType,
                 description);
@@ -112,16 +115,16 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
             @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
             @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
             @JsonProperty(FIELD_NAME_JOIN_SPEC) JoinSpec joinSpec,
-            @JsonProperty(FIELD_NAME_LEFT_UNIQUE_KEYS) List<int[]> leftUniqueKeys,
-            @JsonProperty(FIELD_NAME_RIGHT_UNIQUE_KEYS) List<int[]> rightUniqueKeys,
+            @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEYS) List<int[]> leftUpsertKeys,
+            @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEYS) List<int[]> rightUpsertKeys,
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
             @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(id, context, persistedConfig, inputProperties, outputType, description);
         checkArgument(inputProperties.size() == 2);
         this.joinSpec = checkNotNull(joinSpec);
-        this.leftUniqueKeys = leftUniqueKeys;
-        this.rightUniqueKeys = rightUniqueKeys;
+        this.leftUpsertKeys = leftUpsertKeys;
+        this.rightUpsertKeys = rightUpsertKeys;
     }
 
     @Override
@@ -149,7 +152,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
                         planner.getFlinkContext().getClassLoader(),
                         leftTypeInfo,
                         leftJoinKey,
-                        leftUniqueKeys);
+                        leftUpsertKeys);
 
         final InternalTypeInfo<RowData> rightTypeInfo = InternalTypeInfo.of(rightType);
         final JoinInputSideSpec rightInputSpec =
@@ -157,7 +160,7 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
                         planner.getFlinkContext().getClassLoader(),
                         rightTypeInfo,
                         rightJoinKey,
-                        rightUniqueKeys);
+                        rightUpsertKeys);
 
         GeneratedJoinCondition generatedCondition =
                 JoinUtil.generateConditionFunction(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
index 2caa2bb951b..9aff0a7a014 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
@@ -498,7 +498,7 @@ public class StreamNonDeterministicUpdatePlanVisitor {
                             join.joinSpec().getLeftKeys(),
                             // TODO remove this conversion when scala-free was total done.
                             scala.collection.JavaConverters.seqAsJavaList(
-                                    join.getUniqueKeys(leftRel, join.joinSpec().getLeftKeys())));
+                                    join.getUpsertKeys(leftRel, join.joinSpec().getLeftKeys())));
             StreamPhysicalRel newRight =
                     visitJoinChild(
                             requireDeterminism,
@@ -509,7 +509,7 @@ public class StreamNonDeterministicUpdatePlanVisitor {
                             join.joinSpec().getRightKeys(),
                             // TODO remove this conversion when scala-free was total done.
                             scala.collection.JavaConverters.seqAsJavaList(
-                                    join.getUniqueKeys(rightRel, join.joinSpec().getRightKeys())));
+                                    join.getUpsertKeys(rightRel, join.joinSpec().getRightKeys())));
 
             return (StreamPhysicalRel)
                     join.copy(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala
index bee5b1b00f2..871eccf00ae 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalJoin.scala
@@ -81,8 +81,7 @@ abstract class CommonPhysicalJoin(
       .item("select", getRowType.getFieldNames.mkString(", "))
   }
 
-  def getUniqueKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = {
-    // TODO update method name in FLINK-28787
+  def getUpsertKeys(input: RelNode, keys: Array[Int]): List[Array[Int]] = {
     val upsertKeys = FlinkRelMetadataQuery
       .reuseOrCreate(cluster.getMetadataQuery)
       .getUpsertKeysInKeyGroupRange(input, keys)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
index ed3e40b52ca..60eb40d14d9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalJoin.scala
@@ -67,7 +67,7 @@ class StreamPhysicalJoin(
   def inputUniqueKeyContainsJoinKey(inputOrdinal: Int): Boolean = {
     val input = getInput(inputOrdinal)
     val joinKeys = if (inputOrdinal == 0) joinSpec.getLeftKeys else joinSpec.getRightKeys
-    val inputUniqueKeys = getUniqueKeys(input, joinKeys)
+    val inputUniqueKeys = getUpsertKeys(input, joinKeys)
     if (inputUniqueKeys != null) {
       inputUniqueKeys.exists(uniqueKey => joinKeys.forall(uniqueKey.contains(_)))
     } else {
@@ -96,7 +96,7 @@ class StreamPhysicalJoin(
           unwrapClassLoader(left),
           InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(left.getRowType)),
           joinSpec.getLeftKeys,
-          getUniqueKeys(left, joinSpec.getLeftKeys)
+          getUpsertKeys(left, joinSpec.getLeftKeys)
         )
       )
       .item(
@@ -105,7 +105,7 @@ class StreamPhysicalJoin(
           unwrapClassLoader(right),
           InternalTypeInfo.of(FlinkTypeFactory.toLogicalRowType(right.getRowType)),
           joinSpec.getRightKeys,
-          getUniqueKeys(right, joinSpec.getRightKeys)
+          getUpsertKeys(right, joinSpec.getRightKeys)
         )
       )
   }
@@ -119,8 +119,8 @@ class StreamPhysicalJoin(
     new StreamExecJoin(
       unwrapTableConfig(this),
       joinSpec,
-      getUniqueKeys(left, joinSpec.getLeftKeys),
-      getUniqueKeys(right, joinSpec.getRightKeys),
+      getUpsertKeys(left, joinSpec.getLeftKeys),
+      getUpsertKeys(right, joinSpec.getRightKeys),
       InputProperty.DEFAULT,
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
index 0355fb03137..243f4d203ec 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoin.out
@@ -116,8 +116,6 @@
       "filterNulls" : [ true ],
       "nonEquiCondition" : null
     },
-    "leftUniqueKeys" : [ ],
-    "rightUniqueKeys" : [ ],
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
index 5d80fb2baac..141b22ea160 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
@@ -184,8 +184,8 @@
       "filterNulls" : [ true ],
       "nonEquiCondition" : null
     },
-    "leftUniqueKeys" : [ [ 0 ] ],
-    "rightUniqueKeys" : [ [ 0 ] ],
+    "leftUpsertKeys" : [ [ 0 ] ],
+    "rightUpsertKeys" : [ [ 0 ] ],
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
index 149408c5e1c..b0980c85f52 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
@@ -246,8 +246,8 @@
       "filterNulls" : [ true ],
       "nonEquiCondition" : null
     },
-    "leftUniqueKeys" : [ [ 1 ] ],
-    "rightUniqueKeys" : [ [ 1 ] ],
+    "leftUpsertKeys" : [ [ 1 ] ],
+    "rightUpsertKeys" : [ [ 1 ] ],
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
index 27df7d82b1c..27ad53723c9 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testLeftJoinNonEqui.out
@@ -130,8 +130,6 @@
         "type" : "BOOLEAN"
       }
     },
-    "leftUniqueKeys" : [ ],
-    "rightUniqueKeys" : [ ],
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"