You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/31 11:15:02 UTC

[flink] branch master updated: [hotfix][table-planner] Correct StreamExecLookupJoin from SingleTransformationTranslator to MultipleTransformationTranslator

This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ffa50a1043e [hotfix][table-planner] Correct StreamExecLookupJoin from SingleTransformationTranslator to MultipleTransformationTranslator
ffa50a1043e is described below

commit ffa50a1043e61678c4e81bef06f53d6b79fbe02c
Author: lincoln lee <li...@gmail.com>
AuthorDate: Tue Jan 31 19:14:53 2023 +0800

    [hotfix][table-planner] Correct StreamExecLookupJoin from SingleTransformationTranslator to MultipleTransformationTranslator
    
    This closes #21794
---
 .../table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java      | 4 +++-
 .../table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java    | 4 +---
 .../table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java    | 4 +++-
 3 files changed, 7 insertions(+), 5 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
index 6e4ef47ff7b..d1a16a42fed 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
 import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
@@ -41,7 +42,8 @@ import java.util.List;
 import java.util.Map;
 
 /** {@link BatchExecNode} for temporal table join that implemented by lookup. */
-public class BatchExecLookupJoin extends CommonExecLookupJoin implements BatchExecNode<RowData> {
+public class BatchExecLookupJoin extends CommonExecLookupJoin
+        implements BatchExecNode<RowData>, SingleTransformationTranslator<RowData> {
     public BatchExecLookupJoin(
             ReadableConfig tableConfig,
             FlinkJoinType joinType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index d520f7630ce..70ef0aff24d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -56,7 +56,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
@@ -151,8 +150,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * 3) join left input record and lookup-ed records <br>
  * 4) only outputs the rows which match to the condition <br>
  */
-public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
-        implements SingleTransformationTranslator<RowData> {
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
 
     public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join";
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
index 5f4ebd7bffb..28d15bff07e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
 import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
@@ -53,7 +54,8 @@ import java.util.Map;
         producedTransformations = CommonExecLookupJoin.LOOKUP_JOIN_TRANSFORMATION,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
-public class StreamExecLookupJoin extends CommonExecLookupJoin implements StreamExecNode<RowData> {
+public class StreamExecLookupJoin extends CommonExecLookupJoin
+        implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
     public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize";
 
     public static final String FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY =