You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2023/01/31 11:15:02 UTC
[flink] branch master updated: [hotfix][table-planner] Correct StreamExecLookupJoin from SingleTransformationTranslator to MultipleTransformationTranslator
This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ffa50a1043e [hotfix][table-planner] Correct StreamExecLookupJoin from SingleTransformationTranslator to MultipleTransformationTranslator
ffa50a1043e is described below
commit ffa50a1043e61678c4e81bef06f53d6b79fbe02c
Author: lincoln lee <li...@gmail.com>
AuthorDate: Tue Jan 31 19:14:53 2023 +0800
[hotfix][table-planner] Correct StreamExecLookupJoin from SingleTransformationTranslator to MultipleTransformationTranslator
This closes #21794
---
.../table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java | 4 +++-
.../table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java | 4 +---
.../table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java | 4 +++-
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
index 6e4ef47ff7b..d1a16a42fed 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin;
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
@@ -41,7 +42,8 @@ import java.util.List;
import java.util.Map;
/** {@link BatchExecNode} for temporal table join that implemented by lookup. */
-public class BatchExecLookupJoin extends CommonExecLookupJoin implements BatchExecNode<RowData> {
+public class BatchExecLookupJoin extends CommonExecLookupJoin
+ implements BatchExecNode<RowData>, SingleTransformationTranslator<RowData> {
public BatchExecLookupJoin(
ReadableConfig tableConfig,
FlinkJoinType joinType,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
index d520f7630ce..70ef0aff24d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
@@ -56,7 +56,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
@@ -151,8 +150,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* 3) join left input record and lookup-ed records <br>
* 4) only outputs the rows which match to the condition <br>
*/
-public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData>
- implements SingleTransformationTranslator<RowData> {
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
public static final String LOOKUP_JOIN_TRANSFORMATION = "lookup-join";
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
index 5f4ebd7bffb..28d15bff07e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin;
import org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
@@ -53,7 +54,8 @@ import java.util.Map;
producedTransformations = CommonExecLookupJoin.LOOKUP_JOIN_TRANSFORMATION,
minPlanVersion = FlinkVersion.v1_15,
minStateVersion = FlinkVersion.v1_15)
-public class StreamExecLookupJoin extends CommonExecLookupJoin implements StreamExecNode<RowData> {
+public class StreamExecLookupJoin extends CommonExecLookupJoin
+ implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize";
public static final String FIELD_NAME_LOOKUP_KEY_CONTAINS_PRIMARY_KEY =