You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/15 08:56:39 UTC

[GitHub] [flink] wenlong88 opened a new pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

wenlong88 opened a new pull request #14663:
URL: https://github.com/apache/flink/pull/14663


   ## What is the purpose of the change
   Separate the implementation of StreamExecLookupJoin and BatchExecLookupJoin.
   
   
   ## Brief change log
   Introduce StreamPhysicalLookupJoin and BatchPhysicalLookupJoin, and make their ExecNode only extended from ExecNode
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a refactoring rework covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760764364


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit ca8374f80e4fbfebe7815dbd2c3182c79256f1cb (Fri May 28 07:09:59 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     }, {
       "hash" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168",
       "triggerID" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca8374f80e4fbfebe7815dbd2c3182c79256f1cb",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12170",
       "triggerID" : "ca8374f80e4fbfebe7815dbd2c3182c79256f1cb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ca8374f80e4fbfebe7815dbd2c3182c79256f1cb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12170) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4224ed129067a1550f812e8803c3298d523c2495 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14663:
URL: https://github.com/apache/flink/pull/14663#discussion_r558116547



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,
+            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
+            RelOptTable temporalTable,
+            Optional<RexProgram> calcOnTemporalTable,

Review comment:
       use `@Nullable RexProgram calcOnTemporalTable ` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,
+            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
+            RelOptTable temporalTable,
+            Optional<RexProgram> calcOnTemporalTable,
+            Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
+            ExecEdge inputEdge,
+            LogicalType outputType,

Review comment:
       RowType

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,

Review comment:
       use `@Nullable RexNode joinCondition` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,
+            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
+            RelOptTable temporalTable,

Review comment:
       extract the useful info to another class, `RelOptTable` can be serialized

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,
+            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
+            RelOptTable temporalTable,
+            Optional<RexProgram> calcOnTemporalTable,
+            Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
+            ExecEdge inputEdge,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.joinType = joinType;
+        this.joinCondition = joinCondition;
+        this.lookupKeys = Collections.unmodifiableMap(lookupKeys);
+        this.temporalTable = temporalTable;
+        this.calcOnTemporalTable = calcOnTemporalTable;
+        // validate whether the node is valid and supported.
+        validate();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        RowType inputRowType = (RowType) inputNode.getOutputType();
+        RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
+        RowType resultRowType = (RowType) getOutputType();
+
+        UserDefinedFunction userDefinedFunction =
+                LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
+        boolean isAsyncEnabled = false;
+        if (userDefinedFunction instanceof AsyncTableFunction) {
+            isAsyncEnabled = true;
+        }
+
+        boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
+        StreamOperatorFactory<RowData> operatorFactory;
+        if (isAsyncEnabled) {
+            operatorFactory =
+                    createAsyncLookupJoin(
+                            planner.getTableConfig(),
+                            lookupKeys,
+                            (AsyncTableFunction) userDefinedFunction,
+                            planner.getRelBuilder(),
+                            inputRowType,
+                            tableSourceRowType,
+                            resultRowType,
+                            isLeftOuterJoin);
+        } else {
+            operatorFactory =
+                    createSyncLookupJoin(
+                            planner.getTableConfig(),
+                            lookupKeys,
+                            (TableFunction) userDefinedFunction,
+                            planner.getRelBuilder(),
+                            inputRowType,
+                            tableSourceRowType,
+                            resultRowType,
+                            isLeftOuterJoin,
+                            planner.getExecEnv().getConfig().isObjectReuseEnabled());
+        }
+
+        Transformation<RowData> inputTransformation = inputNode.translateToPlan(planner);
+        OneInputTransformation<RowData, RowData> ret =
+                new OneInputTransformation<>(
+                        inputTransformation,
+                        getDesc(),
+                        operatorFactory,
+                        InternalTypeInfo.of(resultRowType),
+                        inputTransformation.getParallelism());
+        if (inputsContainSingleton()) {
+            ret.setParallelism(1);
+            ret.setMaxParallelism(1);
+        }
+        return ret;
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createAsyncLookupJoin(
+            TableConfig config,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            AsyncTableFunction asyncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin) {
+
+        int asyncBufferCapacity =
+                config.getConfiguration()
+                        .getInteger(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY);
+        long asyncTimeout =
+                config.getConfiguration()
+                        .get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT)
+                        .toMillis();
+
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys.keySet());
+        lookupKeyIndicesInOrder.sort(Integer::compareTo);
+        LookupJoinCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData, Object>>
+                generatedFuncWithType =
+                        LookupJoinCodeGenerator.generateAsyncLookupFunction(
+                                config,
+                                dataTypeFactory,
+                                inputRowType,
+                                tableSourceRowType,
+                                resultRowType,
+                                allLookupKeys,
+                                lookupKeyIndicesInOrder.stream()
+                                        .mapToInt(Integer::intValue)
+                                        .toArray(),
+                                asyncLookupFunction,
+                                StringUtils.join(temporalTable.getQualifiedName(), "."));
+
+        RowType rightRowType =
+                calcOnTemporalTable
+                        .map(RexProgram::getOutputRowType)
+                        .map(FlinkTypeFactory::toLogicalRowType)
+                        .orElse(tableSourceRowType);
+        // a projection or filter after table source scan
+        GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture =
+                LookupJoinCodeGenerator.generateTableAsyncCollector(
+                        config,
+                        "TableFunctionResultFuture",
+                        inputRowType,
+                        rightRowType,
+                        JavaScalaConversionUtil.toScala(joinCondition));
+
+        DataStructureConverter<?, ?> fetcherConverter =
+                DataStructureConverters.getConverter(generatedFuncWithType.dataType());
+        AsyncFunction<RowData, RowData> asyncFunc;
+        if (calcOnTemporalTable.isPresent()) {
+            // a projection or filter after table source scan
+            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
+                    LookupJoinCodeGenerator.generateCalcMapFunction(
+                            config,
+                            JavaScalaConversionUtil.toScala(calcOnTemporalTable),
+                            tableSourceRowType);
+            asyncFunc =
+                    new AsyncLookupJoinWithCalcRunner(
+                            generatedFuncWithType.tableFunc(),
+                            (DataStructureConverter<RowData, Object>) fetcherConverter,
+                            generatedCalc,
+                            generatedResultFuture,
+                            InternalSerializers.create(rightRowType),
+                            isLeftOuterJoin,
+                            asyncBufferCapacity);
+        } else {
+            // right type is the same as table source row type, because no calc after temporal table
+            asyncFunc =
+                    new AsyncLookupJoinRunner(
+                            generatedFuncWithType.tableFunc(),
+                            (DataStructureConverter<RowData, Object>) fetcherConverter,
+                            generatedResultFuture,
+                            InternalSerializers.create(rightRowType),
+                            isLeftOuterJoin,
+                            asyncBufferCapacity);
+        }
+
+        // force ORDERED output mode currently, optimize it to UNORDERED
+        // when the downstream do not need orderness
+        return new AsyncWaitOperatorFactory(
+                asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createSyncLookupJoin(
+            TableConfig config,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            TableFunction syncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin,
+            boolean isObjectReuseEnabled) {
+
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys.keySet());
+        Collections.sort(lookupKeyIndicesInOrder, Integer::compareTo);

Review comment:
       nit: extract these code to a method and return int[] directly

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,
+            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
+            RelOptTable temporalTable,
+            Optional<RexProgram> calcOnTemporalTable,
+            Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
+            ExecEdge inputEdge,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.joinType = joinType;
+        this.joinCondition = joinCondition;
+        this.lookupKeys = Collections.unmodifiableMap(lookupKeys);
+        this.temporalTable = temporalTable;
+        this.calcOnTemporalTable = calcOnTemporalTable;
+        // validate whether the node is valid and supported.
+        validate();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        RowType inputRowType = (RowType) inputNode.getOutputType();
+        RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
+        RowType resultRowType = (RowType) getOutputType();
+
+        UserDefinedFunction userDefinedFunction =
+                LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
+        boolean isAsyncEnabled = false;
+        if (userDefinedFunction instanceof AsyncTableFunction) {
+            isAsyncEnabled = true;
+        }
+
+        boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
+        StreamOperatorFactory<RowData> operatorFactory;
+        if (isAsyncEnabled) {
+            operatorFactory =
+                    createAsyncLookupJoin(
+                            planner.getTableConfig(),
+                            lookupKeys,
+                            (AsyncTableFunction) userDefinedFunction,
+                            planner.getRelBuilder(),
+                            inputRowType,
+                            tableSourceRowType,
+                            resultRowType,
+                            isLeftOuterJoin);
+        } else {
+            operatorFactory =
+                    createSyncLookupJoin(
+                            planner.getTableConfig(),
+                            lookupKeys,
+                            (TableFunction) userDefinedFunction,
+                            planner.getRelBuilder(),
+                            inputRowType,
+                            tableSourceRowType,
+                            resultRowType,
+                            isLeftOuterJoin,
+                            planner.getExecEnv().getConfig().isObjectReuseEnabled());
+        }
+
+        Transformation<RowData> inputTransformation = inputNode.translateToPlan(planner);
+        OneInputTransformation<RowData, RowData> ret =
+                new OneInputTransformation<>(
+                        inputTransformation,
+                        getDesc(),
+                        operatorFactory,
+                        InternalTypeInfo.of(resultRowType),
+                        inputTransformation.getParallelism());
+        if (inputsContainSingleton()) {
+            ret.setParallelism(1);
+            ret.setMaxParallelism(1);
+        }
+        return ret;
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createAsyncLookupJoin(
+            TableConfig config,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            AsyncTableFunction asyncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin) {
+
+        int asyncBufferCapacity =
+                config.getConfiguration()
+                        .getInteger(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY);
+        long asyncTimeout =
+                config.getConfiguration()
+                        .get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT)
+                        .toMillis();
+
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys.keySet());
+        lookupKeyIndicesInOrder.sort(Integer::compareTo);
+        LookupJoinCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData, Object>>
+                generatedFuncWithType =
+                        LookupJoinCodeGenerator.generateAsyncLookupFunction(
+                                config,
+                                dataTypeFactory,
+                                inputRowType,
+                                tableSourceRowType,
+                                resultRowType,
+                                allLookupKeys,
+                                lookupKeyIndicesInOrder.stream()
+                                        .mapToInt(Integer::intValue)
+                                        .toArray(),
+                                asyncLookupFunction,
+                                StringUtils.join(temporalTable.getQualifiedName(), "."));
+
+        RowType rightRowType =
+                calcOnTemporalTable
+                        .map(RexProgram::getOutputRowType)
+                        .map(FlinkTypeFactory::toLogicalRowType)
+                        .orElse(tableSourceRowType);
+        // a projection or filter after table source scan
+        GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture =
+                LookupJoinCodeGenerator.generateTableAsyncCollector(
+                        config,
+                        "TableFunctionResultFuture",
+                        inputRowType,
+                        rightRowType,
+                        JavaScalaConversionUtil.toScala(joinCondition));
+
+        DataStructureConverter<?, ?> fetcherConverter =
+                DataStructureConverters.getConverter(generatedFuncWithType.dataType());
+        AsyncFunction<RowData, RowData> asyncFunc;
+        if (calcOnTemporalTable.isPresent()) {
+            // a projection or filter after table source scan
+            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
+                    LookupJoinCodeGenerator.generateCalcMapFunction(
+                            config,
+                            JavaScalaConversionUtil.toScala(calcOnTemporalTable),
+                            tableSourceRowType);
+            asyncFunc =
+                    new AsyncLookupJoinWithCalcRunner(
+                            generatedFuncWithType.tableFunc(),
+                            (DataStructureConverter<RowData, Object>) fetcherConverter,
+                            generatedCalc,
+                            generatedResultFuture,
+                            InternalSerializers.create(rightRowType),
+                            isLeftOuterJoin,
+                            asyncBufferCapacity);
+        } else {
+            // right type is the same as table source row type, because no calc after temporal table
+            asyncFunc =
+                    new AsyncLookupJoinRunner(
+                            generatedFuncWithType.tableFunc(),
+                            (DataStructureConverter<RowData, Object>) fetcherConverter,
+                            generatedResultFuture,
+                            InternalSerializers.create(rightRowType),
+                            isLeftOuterJoin,
+                            asyncBufferCapacity);
+        }
+
+        // force ORDERED output mode currently, optimize it to UNORDERED
+        // when the downstream do not need orderness
+        return new AsyncWaitOperatorFactory(
+                asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createSyncLookupJoin(
+            TableConfig config,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            TableFunction syncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin,
+            boolean isObjectReuseEnabled) {
+
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys.keySet());
+        Collections.sort(lookupKeyIndicesInOrder, Integer::compareTo);
+
+        GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
+                LookupJoinCodeGenerator.generateSyncLookupFunction(
+                        config,
+                        dataTypeFactory,
+                        inputRowType,
+                        tableSourceRowType,
+                        resultRowType,
+                        allLookupKeys,
+                        lookupKeyIndicesInOrder.stream().mapToInt(Integer::intValue).toArray(),
+                        syncLookupFunction,
+                        StringUtils.join(temporalTable.getQualifiedName(), "."),
+                        isObjectReuseEnabled);
+
+        RowType rightRowType =
+                calcOnTemporalTable
+                        .map(RexProgram::getOutputRowType)
+                        .map(FlinkTypeFactory::toLogicalRowType)
+                        .orElse(tableSourceRowType);
+        CodeGeneratorContext ctx = new CodeGeneratorContext(config);
+        GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
+                LookupJoinCodeGenerator.generateCollector(
+                        ctx,
+                        inputRowType,
+                        rightRowType,
+                        resultRowType,
+                        JavaScalaConversionUtil.toScala(joinCondition),
+                        Option.empty(),

Review comment:
       nit: avoid use scala class ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {

Review comment:
       nit: there are a lot of warnings because many classes miss type parameter

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,
+            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
+            RelOptTable temporalTable,
+            Optional<RexProgram> calcOnTemporalTable,
+            Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
+            ExecEdge inputEdge,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.joinType = joinType;
+        this.joinCondition = joinCondition;
+        this.lookupKeys = Collections.unmodifiableMap(lookupKeys);
+        this.temporalTable = temporalTable;
+        this.calcOnTemporalTable = calcOnTemporalTable;
+        // validate whether the node is valid and supported.
+        validate();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        RowType inputRowType = (RowType) inputNode.getOutputType();
+        RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
+        RowType resultRowType = (RowType) getOutputType();
+
+        UserDefinedFunction userDefinedFunction =
+                LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
+        boolean isAsyncEnabled = false;
+        if (userDefinedFunction instanceof AsyncTableFunction) {
+            isAsyncEnabled = true;
+        }
+
+        boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
+        StreamOperatorFactory<RowData> operatorFactory;
+        if (isAsyncEnabled) {
+            operatorFactory =
+                    createAsyncLookupJoin(
+                            planner.getTableConfig(),
+                            lookupKeys,
+                            (AsyncTableFunction) userDefinedFunction,
+                            planner.getRelBuilder(),
+                            inputRowType,
+                            tableSourceRowType,
+                            resultRowType,
+                            isLeftOuterJoin);
+        } else {
+            operatorFactory =
+                    createSyncLookupJoin(
+                            planner.getTableConfig(),
+                            lookupKeys,
+                            (TableFunction) userDefinedFunction,
+                            planner.getRelBuilder(),
+                            inputRowType,
+                            tableSourceRowType,
+                            resultRowType,
+                            isLeftOuterJoin,
+                            planner.getExecEnv().getConfig().isObjectReuseEnabled());
+        }
+
+        Transformation<RowData> inputTransformation = inputNode.translateToPlan(planner);
+        OneInputTransformation<RowData, RowData> ret =
+                new OneInputTransformation<>(
+                        inputTransformation,
+                        getDesc(),
+                        operatorFactory,
+                        InternalTypeInfo.of(resultRowType),
+                        inputTransformation.getParallelism());
+        if (inputsContainSingleton()) {
+            ret.setParallelism(1);
+            ret.setMaxParallelism(1);
+        }
+        return ret;
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createAsyncLookupJoin(
+            TableConfig config,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            AsyncTableFunction asyncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin) {
+
+        int asyncBufferCapacity =
+                config.getConfiguration()
+                        .getInteger(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY);
+        long asyncTimeout =
+                config.getConfiguration()
+                        .get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT)
+                        .toMillis();
+
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys.keySet());
+        lookupKeyIndicesInOrder.sort(Integer::compareTo);
+        LookupJoinCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData, Object>>
+                generatedFuncWithType =
+                        LookupJoinCodeGenerator.generateAsyncLookupFunction(
+                                config,
+                                dataTypeFactory,
+                                inputRowType,
+                                tableSourceRowType,
+                                resultRowType,
+                                allLookupKeys,
+                                lookupKeyIndicesInOrder.stream()
+                                        .mapToInt(Integer::intValue)
+                                        .toArray(),
+                                asyncLookupFunction,
+                                StringUtils.join(temporalTable.getQualifiedName(), "."));
+
+        RowType rightRowType =
+                calcOnTemporalTable
+                        .map(RexProgram::getOutputRowType)
+                        .map(FlinkTypeFactory::toLogicalRowType)
+                        .orElse(tableSourceRowType);
+        // a projection or filter after table source scan
+        GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture =
+                LookupJoinCodeGenerator.generateTableAsyncCollector(
+                        config,
+                        "TableFunctionResultFuture",
+                        inputRowType,
+                        rightRowType,
+                        JavaScalaConversionUtil.toScala(joinCondition));
+
+        DataStructureConverter<?, ?> fetcherConverter =
+                DataStructureConverters.getConverter(generatedFuncWithType.dataType());
+        AsyncFunction<RowData, RowData> asyncFunc;
+        if (calcOnTemporalTable.isPresent()) {
+            // a projection or filter after table source scan
+            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
+                    LookupJoinCodeGenerator.generateCalcMapFunction(
+                            config,
+                            JavaScalaConversionUtil.toScala(calcOnTemporalTable),
+                            tableSourceRowType);
+            asyncFunc =
+                    new AsyncLookupJoinWithCalcRunner(
+                            generatedFuncWithType.tableFunc(),
+                            (DataStructureConverter<RowData, Object>) fetcherConverter,
+                            generatedCalc,
+                            generatedResultFuture,
+                            InternalSerializers.create(rightRowType),
+                            isLeftOuterJoin,
+                            asyncBufferCapacity);
+        } else {
+            // right type is the same as table source row type, because no calc after temporal table
+            asyncFunc =
+                    new AsyncLookupJoinRunner(
+                            generatedFuncWithType.tableFunc(),
+                            (DataStructureConverter<RowData, Object>) fetcherConverter,
+                            generatedResultFuture,
+                            InternalSerializers.create(rightRowType),
+                            isLeftOuterJoin,
+                            asyncBufferCapacity);
+        }
+
+        // force ORDERED output mode currently, optimize it to UNORDERED
+        // when the downstream do not need orderness
+        return new AsyncWaitOperatorFactory(
+                asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createSyncLookupJoin(
+            TableConfig config,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            TableFunction syncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin,
+            boolean isObjectReuseEnabled) {
+
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys.keySet());
+        Collections.sort(lookupKeyIndicesInOrder, Integer::compareTo);
+
+        GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
+                LookupJoinCodeGenerator.generateSyncLookupFunction(
+                        config,
+                        dataTypeFactory,
+                        inputRowType,
+                        tableSourceRowType,
+                        resultRowType,
+                        allLookupKeys,
+                        lookupKeyIndicesInOrder.stream().mapToInt(Integer::intValue).toArray(),
+                        syncLookupFunction,
+                        StringUtils.join(temporalTable.getQualifiedName(), "."),
+                        isObjectReuseEnabled);
+
+        RowType rightRowType =
+                calcOnTemporalTable
+                        .map(RexProgram::getOutputRowType)
+                        .map(FlinkTypeFactory::toLogicalRowType)
+                        .orElse(tableSourceRowType);
+        CodeGeneratorContext ctx = new CodeGeneratorContext(config);
+        GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
+                LookupJoinCodeGenerator.generateCollector(
+                        ctx,
+                        inputRowType,
+                        rightRowType,
+                        resultRowType,
+                        JavaScalaConversionUtil.toScala(joinCondition),
+                        Option.empty(),
+                        true);
+        ProcessFunction<RowData, RowData> processFunc;
+        if (calcOnTemporalTable.isPresent()) {
+            // a projection or filter after table source scan
+            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
+                    LookupJoinCodeGenerator.generateCalcMapFunction(
+                            config,
+                            JavaScalaConversionUtil.toScala(calcOnTemporalTable),
+                            tableSourceRowType);
+
+            processFunc =
+                    new LookupJoinWithCalcRunner(
+                            generatedFetcher,
+                            generatedCalc,
+                            generatedCollector,
+                            isLeftOuterJoin,
+                            rightRowType.getFieldCount());
+        } else {
+            // right type is the same as table source row type, because no calc after temporal table
+            processFunc =
+                    new LookupJoinRunner(
+                            generatedFetcher,
+                            generatedCollector,
+                            isLeftOuterJoin,
+                            rightRowType.getFieldCount());
+        }
+        return SimpleOperatorFactory.of(new ProcessOperator(processFunc));
+    }
+
+    // ----------------------------------------------------------------------------------------
+    //                                       Validation
+    // ----------------------------------------------------------------------------------------
+
+    private void validate() {
+
+        // validate table source and function implementation first
+        validateTableSource();
+
+        // check join on all fields of PRIMARY KEY or (UNIQUE) INDEX
+        if (lookupKeys.isEmpty()) {

Review comment:
       `join key types validation` is missed ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,
+            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
+            RelOptTable temporalTable,
+            Optional<RexProgram> calcOnTemporalTable,
+            Map<Integer, LookupJoinUtil.LookupKey> lookupKeys,
+            ExecEdge inputEdge,
+            LogicalType outputType,
+            String description) {
+        super(Collections.singletonList(inputEdge), outputType, description);
+        this.joinType = joinType;
+        this.joinCondition = joinCondition;
+        this.lookupKeys = Collections.unmodifiableMap(lookupKeys);
+        this.temporalTable = temporalTable;
+        this.calcOnTemporalTable = calcOnTemporalTable;
+        // validate whether the node is valid and supported.
+        validate();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0);
+        RowType inputRowType = (RowType) inputNode.getOutputType();
+        RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
+        RowType resultRowType = (RowType) getOutputType();
+
+        UserDefinedFunction userDefinedFunction =
+                LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
+        boolean isAsyncEnabled = false;
+        if (userDefinedFunction instanceof AsyncTableFunction) {
+            isAsyncEnabled = true;
+        }
+
+        boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
+        StreamOperatorFactory<RowData> operatorFactory;
+        if (isAsyncEnabled) {
+            operatorFactory =
+                    createAsyncLookupJoin(
+                            planner.getTableConfig(),
+                            lookupKeys,
+                            (AsyncTableFunction) userDefinedFunction,
+                            planner.getRelBuilder(),
+                            inputRowType,
+                            tableSourceRowType,
+                            resultRowType,
+                            isLeftOuterJoin);
+        } else {
+            operatorFactory =
+                    createSyncLookupJoin(
+                            planner.getTableConfig(),
+                            lookupKeys,
+                            (TableFunction) userDefinedFunction,
+                            planner.getRelBuilder(),
+                            inputRowType,
+                            tableSourceRowType,
+                            resultRowType,
+                            isLeftOuterJoin,
+                            planner.getExecEnv().getConfig().isObjectReuseEnabled());
+        }
+
+        Transformation<RowData> inputTransformation = inputNode.translateToPlan(planner);
+        OneInputTransformation<RowData, RowData> ret =
+                new OneInputTransformation<>(
+                        inputTransformation,
+                        getDesc(),
+                        operatorFactory,
+                        InternalTypeInfo.of(resultRowType),
+                        inputTransformation.getParallelism());
+        if (inputsContainSingleton()) {
+            ret.setParallelism(1);
+            ret.setMaxParallelism(1);
+        }
+        return ret;
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createAsyncLookupJoin(
+            TableConfig config,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            AsyncTableFunction asyncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin) {
+
+        int asyncBufferCapacity =
+                config.getConfiguration()
+                        .getInteger(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY);
+        long asyncTimeout =
+                config.getConfiguration()
+                        .get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT)
+                        .toMillis();
+
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys.keySet());
+        lookupKeyIndicesInOrder.sort(Integer::compareTo);
+        LookupJoinCodeGenerator.GeneratedTableFunctionWithDataType<AsyncFunction<RowData, Object>>
+                generatedFuncWithType =
+                        LookupJoinCodeGenerator.generateAsyncLookupFunction(
+                                config,
+                                dataTypeFactory,
+                                inputRowType,
+                                tableSourceRowType,
+                                resultRowType,
+                                allLookupKeys,
+                                lookupKeyIndicesInOrder.stream()
+                                        .mapToInt(Integer::intValue)
+                                        .toArray(),
+                                asyncLookupFunction,
+                                StringUtils.join(temporalTable.getQualifiedName(), "."));
+
+        RowType rightRowType =
+                calcOnTemporalTable
+                        .map(RexProgram::getOutputRowType)
+                        .map(FlinkTypeFactory::toLogicalRowType)
+                        .orElse(tableSourceRowType);
+        // a projection or filter after table source scan
+        GeneratedResultFuture<TableFunctionResultFuture<RowData>> generatedResultFuture =
+                LookupJoinCodeGenerator.generateTableAsyncCollector(
+                        config,
+                        "TableFunctionResultFuture",
+                        inputRowType,
+                        rightRowType,
+                        JavaScalaConversionUtil.toScala(joinCondition));
+
+        DataStructureConverter<?, ?> fetcherConverter =
+                DataStructureConverters.getConverter(generatedFuncWithType.dataType());
+        AsyncFunction<RowData, RowData> asyncFunc;
+        if (calcOnTemporalTable.isPresent()) {
+            // a projection or filter after table source scan
+            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
+                    LookupJoinCodeGenerator.generateCalcMapFunction(
+                            config,
+                            JavaScalaConversionUtil.toScala(calcOnTemporalTable),
+                            tableSourceRowType);
+            asyncFunc =
+                    new AsyncLookupJoinWithCalcRunner(
+                            generatedFuncWithType.tableFunc(),
+                            (DataStructureConverter<RowData, Object>) fetcherConverter,
+                            generatedCalc,
+                            generatedResultFuture,
+                            InternalSerializers.create(rightRowType),
+                            isLeftOuterJoin,
+                            asyncBufferCapacity);
+        } else {
+            // right type is the same as table source row type, because no calc after temporal table
+            asyncFunc =
+                    new AsyncLookupJoinRunner(
+                            generatedFuncWithType.tableFunc(),
+                            (DataStructureConverter<RowData, Object>) fetcherConverter,
+                            generatedResultFuture,
+                            InternalSerializers.create(rightRowType),
+                            isLeftOuterJoin,
+                            asyncBufferCapacity);
+        }
+
+        // force ORDERED output mode currently, optimize it to UNORDERED
+        // when the downstream do not need orderness
+        return new AsyncWaitOperatorFactory(
+                asyncFunc, asyncTimeout, asyncBufferCapacity, AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    @SuppressWarnings("unchecked")
+    private StreamOperatorFactory<RowData> createSyncLookupJoin(
+            TableConfig config,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            TableFunction syncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin,
+            boolean isObjectReuseEnabled) {
+
+        DataTypeFactory dataTypeFactory =
+                ShortcutUtils.unwrapContext(relBuilder).getCatalogManager().getDataTypeFactory();
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(allLookupKeys.keySet());
+        Collections.sort(lookupKeyIndicesInOrder, Integer::compareTo);
+
+        GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedFetcher =
+                LookupJoinCodeGenerator.generateSyncLookupFunction(
+                        config,
+                        dataTypeFactory,
+                        inputRowType,
+                        tableSourceRowType,
+                        resultRowType,
+                        allLookupKeys,
+                        lookupKeyIndicesInOrder.stream().mapToInt(Integer::intValue).toArray(),
+                        syncLookupFunction,
+                        StringUtils.join(temporalTable.getQualifiedName(), "."),
+                        isObjectReuseEnabled);
+
+        RowType rightRowType =
+                calcOnTemporalTable
+                        .map(RexProgram::getOutputRowType)
+                        .map(FlinkTypeFactory::toLogicalRowType)
+                        .orElse(tableSourceRowType);
+        CodeGeneratorContext ctx = new CodeGeneratorContext(config);
+        GeneratedCollector<TableFunctionCollector<RowData>> generatedCollector =
+                LookupJoinCodeGenerator.generateCollector(
+                        ctx,
+                        inputRowType,
+                        rightRowType,
+                        resultRowType,
+                        JavaScalaConversionUtil.toScala(joinCondition),
+                        Option.empty(),
+                        true);
+        ProcessFunction<RowData, RowData> processFunc;
+        if (calcOnTemporalTable.isPresent()) {
+            // a projection or filter after table source scan
+            GeneratedFunction<FlatMapFunction<RowData, RowData>> generatedCalc =
+                    LookupJoinCodeGenerator.generateCalcMapFunction(
+                            config,
+                            JavaScalaConversionUtil.toScala(calcOnTemporalTable),
+                            tableSourceRowType);
+
+            processFunc =
+                    new LookupJoinWithCalcRunner(
+                            generatedFetcher,
+                            generatedCalc,
+                            generatedCollector,
+                            isLeftOuterJoin,
+                            rightRowType.getFieldCount());
+        } else {
+            // right type is the same as table source row type, because no calc after temporal table
+            processFunc =
+                    new LookupJoinRunner(
+                            generatedFetcher,
+                            generatedCollector,
+                            isLeftOuterJoin,
+                            rightRowType.getFieldCount());
+        }
+        return SimpleOperatorFactory.of(new ProcessOperator(processFunc));
+    }
+
+    // ----------------------------------------------------------------------------------------
+    //                                       Validation
+    // ----------------------------------------------------------------------------------------
+
+    private void validate() {
+
+        // validate table source and function implementation first
+        validateTableSource();

Review comment:
       ` prepareInstance(config, lookupFunction)` is missed ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4224ed129067a1550f812e8803c3298d523c2495 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120) 
   * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14663:
URL: https://github.com/apache/flink/pull/14663#discussion_r558185519



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {

Review comment:
       nit: there are a lot of warnings because many classes lack type parameter




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760764364


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804 (Fri Jan 15 08:59:49 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4224ed129067a1550f812e8803c3298d523c2495 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120) 
   * 852af6f79074f5d25ef8194963c464feb0038787 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14663:
URL: https://github.com/apache/flink/pull/14663#discussion_r559352791



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
##########
@@ -58,4 +71,50 @@ public FieldRefLookupKey(int index) {
     private LookupJoinUtil() {
         // no instantiation
     }
+
+    /** Gets LookupFunction from temporal table according to the given lookup keys. */
+    public static UserDefinedFunction getLookupFunction(
+            RelOptTable temporalTable, Collection<Integer> lookupKeys) {
+
+        List<Integer> lookupKeyIndicesInOrder = new ArrayList<>(lookupKeys);
+        lookupKeyIndicesInOrder.sort(Integer::compareTo);

Review comment:
       nit: `CommonExecLookupJoin#getOrderedLookupKeys` can be moved into `LookupJoinUtil` and then this method can reuse it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     }, {
       "hash" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) 
   * 606f33587dd148ad9e2dc4c9fb91986e4c995bc3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14663:
URL: https://github.com/apache/flink/pull/14663#discussion_r558122588



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java
##########
@@ -0,0 +1,445 @@
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.AsyncDataStream;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.async.AsyncFunction;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.runtime.collector.TableFunctionCollector;
+import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
+import org.apache.flink.table.runtime.generated.GeneratedCollector;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
+import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
+import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.sources.LookupableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.Option;
+
+/**
+ * Base {@link ExecNode} for temporal table join which shares most methods.
+ *
+ * <p>For a lookup join query:
+ *
+ * <pre>
+ * SELECT T.id, T.content, D.age
+ * FROM T JOIN userTable FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.content = concat(D.name, '!') AND D.age = 11 AND T.id = D.id
+ * WHERE D.name LIKE 'Jack%'
+ * </pre>
+ *
+ * <p>The LookupJoin physical node encapsulates the following RelNode tree:
+ *
+ * <pre>
+ *      Join (l.name = r.name)
+ *    /     \
+ * RelNode  Calc (concat(name, "!") as name, name LIKE 'Jack%')
+ *           |
+ *        DimTable (lookup-keys: age=11, id=l.id)
+ *     (age, id, name)
+ * </pre>
+ *
+ * <ul>
+ *   <li>lookupKeys: [$0=11, $1=l.id] ($0 and $1 is the indexes of age and id in dim table)
+ *   <li>calcOnTemporalTable: calc on temporal table rows before join
+ *   <li>joinCondition: join condition on temporal table rows after calc
+ * </ul>
+ *
+ * <p>The workflow of lookup join:
+ *
+ * <p>1) lookup records dimension table using the lookup-keys <br>
+ * 2) project & filter on the lookup-ed records <br>
+ * 3) join left input record and lookup-ed records <br>
+ * 4) only outputs the rows which match to the condition <br>
+ */
+public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> {
+
+    private final FlinkJoinType joinType;
+    /**
+     * lookup keys: the key is index in dim table. the value is source of lookup key either
+     * constant or field from right table.
+     */
+    private final Map<Integer, LookupJoinUtil.LookupKey> lookupKeys;
+    /** the reference of temporal table to look up. */
+    private final RelOptTable temporalTable;
+    /** calc performed on rows of temporal table before join. */
+    private final Optional<RexProgram> calcOnTemporalTable;
+    /** join condition except equi-conditions extracted as lookup keys. */
+    private final Optional<RexNode> joinCondition;
+
+    protected CommonExecLookupJoin(
+            FlinkJoinType joinType,
+            Optional<RexNode> joinCondition,
+            // TODO: refactor this into TableSourceTable, once legacy TableSource is removed
+            RelOptTable temporalTable,

Review comment:
       extract the useful info to another class, `RelOptTable` can not be serialized




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108) 
   * 4224ed129067a1550f812e8803c3298d523c2495 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe closed pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #14663:
URL: https://github.com/apache/flink/pull/14663


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     }, {
       "hash" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168",
       "triggerID" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca8374f80e4fbfebe7815dbd2c3182c79256f1cb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12170",
       "triggerID" : "ca8374f80e4fbfebe7815dbd2c3182c79256f1cb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 606f33587dd148ad9e2dc4c9fb91986e4c995bc3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168) 
   * ca8374f80e4fbfebe7815dbd2c3182c79256f1cb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12170) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     }, {
       "hash" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168",
       "triggerID" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca8374f80e4fbfebe7815dbd2c3182c79256f1cb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12170",
       "triggerID" : "ca8374f80e4fbfebe7815dbd2c3182c79256f1cb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) 
   * 606f33587dd148ad9e2dc4c9fb91986e4c995bc3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168) 
   * ca8374f80e4fbfebe7815dbd2c3182c79256f1cb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12170) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     }, {
       "hash" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168",
       "triggerID" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ca8374f80e4fbfebe7815dbd2c3182c79256f1cb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ca8374f80e4fbfebe7815dbd2c3182c79256f1cb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) 
   * 606f33587dd148ad9e2dc4c9fb91986e4c995bc3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168) 
   * ca8374f80e4fbfebe7815dbd2c3182c79256f1cb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14663: [FLINK-20925][table-planner-blink] Separate implemention of StreamExecLookupJoin and BatchExecLookupJoin

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14663:
URL: https://github.com/apache/flink/pull/14663#issuecomment-760769392


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12108",
       "triggerID" : "1bac0fae97e00f2c2f5b2d9b0ad7f1ae97845804",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4224ed129067a1550f812e8803c3298d523c2495",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12120",
       "triggerID" : "4224ed129067a1550f812e8803c3298d523c2495",
       "triggerType" : "PUSH"
     }, {
       "hash" : "852af6f79074f5d25ef8194963c464feb0038787",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163",
       "triggerID" : "852af6f79074f5d25ef8194963c464feb0038787",
       "triggerType" : "PUSH"
     }, {
       "hash" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168",
       "triggerID" : "606f33587dd148ad9e2dc4c9fb91986e4c995bc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 852af6f79074f5d25ef8194963c464feb0038787 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12163) 
   * 606f33587dd148ad9e2dc4c9fb91986e4c995bc3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12168) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org