You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/13 09:56:14 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #14606: [Flink-20876][table-planner-blink] Separate the implementation of StreamExecTemporalJoin

godfreyhe commented on a change in pull request #14606:
URL: https://github.com/apache/flink/pull/14606#discussion_r556394806



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION, TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+import org.apache.flink.util.Preconditions.checkState
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+/**
+ * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF) and
+ * temporal TableFunction join (LATERAL TemporalTableFunction(oproctime)).

Review comment:
       tpyo: oproctime

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
##########
@@ -20,68 +20,54 @@ package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
-import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.types.logical.{LogicalType, RowType}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.{Join, JoinInfo}
-import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.rex.RexNode
 import org.apache.calcite.util.ImmutableIntList
-import org.apache.calcite.util.mapping.IntPair
 
 import java.util
 
-import scala.collection.mutable
+import org.apache.flink.table.runtime.types.PlannerTypeUtils
 
 /**
   * Util for [[Join]]s.
   */
 object JoinUtil {
 
   /**
-    * Check and get join left and right keys.
-    */
-  def checkAndGetJoinKeys(
-      keyPairs: List[IntPair],
-      left: RelNode,
-      right: RelNode,
-      allowEmptyKey: Boolean = false): (Array[Int], Array[Int]) = {
-    // get the equality keys
-    val leftKeys = mutable.ArrayBuffer.empty[Int]
-    val rightKeys = mutable.ArrayBuffer.empty[Int]
-    if (keyPairs.isEmpty) {
-      if (allowEmptyKey) {
-        (leftKeys.toArray, rightKeys.toArray)
-      } else {
+   * Validates that join keys in [[JoinSpec]] is compatible in both sides of join.
+   */
+  def validateJoinSpec(
+      joinSpec: JoinSpec,
+      leftType: RowType,
+      rightType: RowType,
+      allowEmptyKey: Boolean = false): Unit = {
+    if (joinSpec.getLeftKeys.isEmpty || !allowEmptyKey) {
         throw new TableException(
           s"Joins should have at least one equality condition.\n" +
-            s"\tleft: ${left.toString}\n\tright: ${right.toString}\n" +
+            s"\tleft: ${leftType}\n\tright: ${rightType}\n" +

Review comment:
       left -> left type
   right -> right type ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/JoinSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.utils;
+
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** JoinSpec describes how two table will be joined. */
+public class JoinSpec {
+    /** Type of the join. */
+    private final FlinkJoinType joinType;
+    /** 0-based index of join keys in left side. */
+    private final int[] leftKeys;
+    /** 0-based index of join keys in right side. */
+    private final int[] rightKeys;
+    /** whether to filter null values or not. */
+    private final boolean[] filterNulls;
+    /** Non Equi join conditions. */
+    private final @Nullable RexNode nonEquiConditions;

Review comment:
       `nonEquiConditions` -> `nonEquiCondition` ?  (it's not a collection)

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/JoinSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.utils;
+
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** JoinSpec describes how two table will be joined. */

Review comment:
       typo: `two tables`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator;
+import org.apache.flink.table.planner.codegen.FunctionCodeGenerator;
+import org.apache.flink.table.planner.codegen.GeneratedExpression;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+/**
+ * {@link StreamExecNode} for temporal table join (FOR SYSTEM_TIME AS OF) and temporal TableFunction
+ * join (LATERAL TemporalTableFunction(o.proctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table join, the only
+ * difference is the validation, we reuse most same logic here.
+ */
+public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final JoinSpec joinSpec;
+    private final boolean isTemporalFunctionJoin;
+    private final int leftTimeAttributeIndex;
+    private final Optional<Integer> rightTimeAttributeIndex;
+
+    public StreamExecTemporalJoin(
+            JoinSpec joinSpec,
+            boolean isTemporalTableFunctionJoin,
+            int leftTimeAttributeIndex,
+            Optional<Integer> rightTimeAttributeIndex,
+            ExecEdge leftEdge,
+            ExecEdge rightEdge,
+            RowType outputType,
+            String description) {
+        super(Lists.newArrayList(leftEdge, rightEdge), outputType, description);
+        this.joinSpec = joinSpec;
+        this.isTemporalFunctionJoin = isTemporalTableFunctionJoin;

Review comment:
       we can derive this boolean value based on `nonEquiConditions`, so this field can be removed

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashJoin.scala
##########
@@ -54,13 +56,11 @@ class BatchPhysicalHashJoin(
     val tryDistinctBuildRow: Boolean)
   extends BatchPhysicalJoinBase(cluster, traitSet, leftRel, rightRel, condition, joinType) {
 
-  private val (leftKeys, rightKeys) =
-    JoinUtil.checkAndGetJoinKeys(keyPairs, getLeft, getRight, allowEmptyKey = true)
-  val (buildKeys, probeKeys) = if (leftIsBuild) (leftKeys, rightKeys) else (rightKeys, leftKeys)
-
-  // Inputs could be changed. See [[BiRel.replaceInput]].
-  def buildRel: RelNode = if (leftIsBuild) getLeft else getRight
-  def probeRel: RelNode = if (leftIsBuild) getRight else getLeft
+  JoinUtil.validateJoinSpec(
+    joinSpec,
+    FlinkTypeFactory.toLogicalRowType(left.getRowType),
+    FlinkTypeFactory.toLogicalRowType(right.getRowType),
+    allowEmptyKey = true)

Review comment:
       `allowEmptyKey` should be false

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator;
+import org.apache.flink.table.planner.codegen.FunctionCodeGenerator;
+import org.apache.flink.table.planner.codegen.GeneratedExpression;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+/**
+ * {@link StreamExecNode} for temporal table join (FOR SYSTEM_TIME AS OF) and temporal TableFunction
+ * join (LATERAL TemporalTableFunction(o.proctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table join, the only
+ * difference is the validation, we reuse most same logic here.
+ */
+public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final JoinSpec joinSpec;
+    private final boolean isTemporalFunctionJoin;
+    private final int leftTimeAttributeIndex;
+    private final Optional<Integer> rightTimeAttributeIndex;
+
+    public StreamExecTemporalJoin(
+            JoinSpec joinSpec,
+            boolean isTemporalTableFunctionJoin,
+            int leftTimeAttributeIndex,
+            Optional<Integer> rightTimeAttributeIndex,
+            ExecEdge leftEdge,
+            ExecEdge rightEdge,
+            RowType outputType,
+            String description) {
+        super(Lists.newArrayList(leftEdge, rightEdge), outputType, description);
+        this.joinSpec = joinSpec;
+        this.isTemporalFunctionJoin = isTemporalTableFunctionJoin;
+        this.leftTimeAttributeIndex = leftTimeAttributeIndex;
+        this.rightTimeAttributeIndex = rightTimeAttributeIndex;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+
+        ExecNode<RowData> leftInput = (ExecNode<RowData>) getInputNodes().get(0);
+        ExecNode<RowData> rightInput = (ExecNode<RowData>) getInputNodes().get(1);
+        RowType leftInputType = (RowType) leftInput.getOutputType();
+        RowType rightInputType = (RowType) rightInput.getOutputType();
+
+        JoinUtil.validateJoinSpec(joinSpec, leftInputType, rightInputType, true);
+
+        FlinkJoinType joinType = joinSpec.getJoinType();
+        if (isTemporalFunctionJoin) {
+            if (joinType != FlinkJoinType.INNER) {
+                throw new ValidationException(
+                        "Temporal table function join currently only support INNER JOIN, "
+                                + "but was "
+                                + joinType
+                                + " JOIN.");

Review comment:
       nit: merge to one line

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION, TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+import org.apache.flink.util.Preconditions.checkState
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+/**
+ * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF) and
+ * temporal TableFunction join (LATERAL TemporalTableFunction(oproctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table join,
+ * the only difference is the validation, we reuse most same logic here.
+ */
+class StreamPhysicalTemporalJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftRel: RelNode,
+    rightRel: RelNode,
+    condition: RexNode,
+    joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, joinType)
+  with StreamPhysicalRel {
+
+  override def requireWatermark: Boolean = {
+    TemporalJoinUtil.isRowTimeJoin(joinSpec)
+  }
+
+  override def copy(
+      traitSet: RelTraitSet,
+      conditionExpr: RexNode,
+      left: RelNode,
+      right: RelNode,
+      joinType: JoinRelType,
+      semiJoinDone: Boolean): Join = {
+    new StreamPhysicalTemporalJoin(
+      cluster,
+      traitSet,
+      left,
+      right,
+      conditionExpr,
+      joinType)
+  }
+
+  override def translateToExecNode(): ExecNode[_] = {
+    val textualRepresentation = this.toString
+    val rexBuilder = cluster.getRexBuilder
+    val isTemporalFunctionJoin =
+        TemporalJoinUtil.isTemporalFunctionJoin(rexBuilder, joinInfo)
+
+    val leftFieldCount = getLeft.getRowType.getFieldCount
+    val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+      textualRepresentation,
+      leftFieldCount,
+      joinSpec,
+      cluster.getRexBuilder,
+      isTemporalFunctionJoin)
+    val remainingNonEquiJoinPredicates =
+      temporalJoinConditionExtractor.apply(joinSpec.getNonEquiConditions)
+    val temporalJoinSpec = new JoinSpec(
+      joinSpec.getJoinType,
+      joinSpec.getLeftKeys,
+      joinSpec.getRightKeys,
+      joinSpec.getFilterNulls,
+      remainingNonEquiJoinPredicates)
+
+    val (leftTimeAttributeInputRef, rightRowTimeAttributeInputRef: Optional[Integer]) =
+      if (TemporalJoinUtil.isRowTimeJoin(joinSpec)) {
+        checkState(temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
+          temporalJoinConditionExtractor.rightPrimaryKey.isDefined,
+          "Missing %s in Event-Time temporal join condition", TEMPORAL_JOIN_CONDITION)
+
+        val leftTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, textualRepresentation)
+        val rightTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.rightTimeAttribute.get, textualRepresentation)
+        val rightInputRef = rightTimeAttributeInputRef - leftFieldCount
+
+        (leftTimeAttributeInputRef, Optional.of(new Integer(rightInputRef)))
+      } else {
+        val leftTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, textualRepresentation)
+        // right time attribute defined in temporal join condition iff in Event time join
+        (leftTimeAttributeInputRef, Optional.empty().asInstanceOf[Optional[Integer]])
+      }
+
+    new StreamExecTemporalJoin(
+      temporalJoinSpec,
+      isTemporalFunctionJoin,
+      leftTimeAttributeInputRef,
+      rightRowTimeAttributeInputRef,
+      ExecEdge.DEFAULT,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription)
+  }
+
+  /**
+   * TemporalJoinConditionExtractor extracts TEMPORAL_JOIN_CONDITION from non-equi join conditions.

Review comment:
       nit: add link for `TEMPORAL_JOIN_CONDITION`

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
##########
@@ -20,68 +20,54 @@ package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
-import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.types.logical.{LogicalType, RowType}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.{Join, JoinInfo}
-import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.rex.RexNode
 import org.apache.calcite.util.ImmutableIntList
-import org.apache.calcite.util.mapping.IntPair
 
 import java.util
 
-import scala.collection.mutable
+import org.apache.flink.table.runtime.types.PlannerTypeUtils
 
 /**
   * Util for [[Join]]s.
   */
 object JoinUtil {
 
   /**
-    * Check and get join left and right keys.
-    */
-  def checkAndGetJoinKeys(
-      keyPairs: List[IntPair],
-      left: RelNode,
-      right: RelNode,
-      allowEmptyKey: Boolean = false): (Array[Int], Array[Int]) = {
-    // get the equality keys
-    val leftKeys = mutable.ArrayBuffer.empty[Int]
-    val rightKeys = mutable.ArrayBuffer.empty[Int]
-    if (keyPairs.isEmpty) {
-      if (allowEmptyKey) {
-        (leftKeys.toArray, rightKeys.toArray)
-      } else {
+   * Validates that join keys in [[JoinSpec]] is compatible in both sides of join.
+   */
+  def validateJoinSpec(
+      joinSpec: JoinSpec,
+      leftType: RowType,
+      rightType: RowType,
+      allowEmptyKey: Boolean = false): Unit = {
+    if (joinSpec.getLeftKeys.isEmpty || !allowEmptyKey) {

Review comment:
       The condition should be `joinSpec.getLeftKeys.isEmpty && !allowEmptyKey`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator;
+import org.apache.flink.table.planner.codegen.FunctionCodeGenerator;
+import org.apache.flink.table.planner.codegen.GeneratedExpression;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+/**
+ * {@link StreamExecNode} for temporal table join (FOR SYSTEM_TIME AS OF) and temporal TableFunction
+ * join (LATERAL TemporalTableFunction(o.proctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table join, the only
+ * difference is the validation, we reuse most same logic here.
+ */
+public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final JoinSpec joinSpec;
+    private final boolean isTemporalFunctionJoin;
+    private final int leftTimeAttributeIndex;
+    private final Optional<Integer> rightTimeAttributeIndex;

Review comment:
       we should avoid using `Optional` when the filed is need to serialize.  I remember some flink documents mentioned it before, we should use Optional as return type to avoid Null.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/JoinSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.utils;
+
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** JoinSpec describes how two table will be joined. */
+public class JoinSpec {
+    /** Type of the join. */

Review comment:
       nit: we can add a link for the type, or use {@link FlinkJoinType}} directly.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator;
+import org.apache.flink.table.planner.codegen.FunctionCodeGenerator;
+import org.apache.flink.table.planner.codegen.GeneratedExpression;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+/**
+ * {@link StreamExecNode} for temporal table join (FOR SYSTEM_TIME AS OF) and temporal TableFunction
+ * join (LATERAL TemporalTableFunction(o.proctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table join, the only
+ * difference is the validation, we reuse most same logic here.
+ */
+public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final JoinSpec joinSpec;
+    private final boolean isTemporalFunctionJoin;
+    private final int leftTimeAttributeIndex;
+    private final Optional<Integer> rightTimeAttributeIndex;
+
+    public StreamExecTemporalJoin(
+            JoinSpec joinSpec,
+            boolean isTemporalTableFunctionJoin,
+            int leftTimeAttributeIndex,
+            Optional<Integer> rightTimeAttributeIndex,
+            ExecEdge leftEdge,
+            ExecEdge rightEdge,
+            RowType outputType,
+            String description) {
+        super(Lists.newArrayList(leftEdge, rightEdge), outputType, description);
+        this.joinSpec = joinSpec;
+        this.isTemporalFunctionJoin = isTemporalTableFunctionJoin;
+        this.leftTimeAttributeIndex = leftTimeAttributeIndex;
+        this.rightTimeAttributeIndex = rightTimeAttributeIndex;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+
+        ExecNode<RowData> leftInput = (ExecNode<RowData>) getInputNodes().get(0);
+        ExecNode<RowData> rightInput = (ExecNode<RowData>) getInputNodes().get(1);
+        RowType leftInputType = (RowType) leftInput.getOutputType();
+        RowType rightInputType = (RowType) rightInput.getOutputType();
+
+        JoinUtil.validateJoinSpec(joinSpec, leftInputType, rightInputType, true);
+
+        FlinkJoinType joinType = joinSpec.getJoinType();
+        if (isTemporalFunctionJoin) {
+            if (joinType != FlinkJoinType.INNER) {
+                throw new ValidationException(
+                        "Temporal table function join currently only support INNER JOIN, "
+                                + "but was "
+                                + joinType
+                                + " JOIN.");
+            }
+        } else {
+            if (joinType != FlinkJoinType.LEFT && joinType != FlinkJoinType.INNER) {
+                throw new TableException(
+                        "Temporal table join currently only support INNER JOIN and LEFT JOIN, "
+                                + "but was "
+                                + joinType
+                                + " JOIN.");
+            }
+        }
+
+        RowType returnType = (RowType) getOutputType();
+
+        TwoInputStreamOperator<RowData, RowData, RowData> joinOperator =
+                getJoinOperator(planner.getTableConfig(), leftInputType, rightInputType);
+        Transformation<RowData> leftTransform = leftInput.translateToPlan(planner);
+        Transformation<RowData> rightTransform = rightInput.translateToPlan(planner);
+
+        TwoInputTransformation<RowData, RowData, RowData> ret =
+                new TwoInputTransformation<>(
+                        leftTransform,
+                        rightTransform,
+                        getDesc(),
+                        joinOperator,
+                        InternalTypeInfo.of(returnType),
+                        leftTransform.getParallelism());
+
+        if (inputsContainSingleton()) {
+            ret.setParallelism(1);
+            ret.setMaxParallelism(1);
+        }
+
+        // set KeyType and Selector for state
+        RowDataKeySelector leftKeySelector = getLeftKeySelector(leftInputType);
+        RowDataKeySelector rightKeySelector = getRightKeySelector(rightInputType);
+        ret.setStateKeySelectors(leftKeySelector, rightKeySelector);
+        LogicalType[] keyTypes =
+                IntStream.of(joinSpec.getLeftKeys())
+                        .mapToObj(idx -> leftInputType.getTypeAt(idx))
+                        .toArray(LogicalType[]::new);
+        ret.setStateKeyType(InternalTypeInfo.ofFields(keyTypes));
+        return ret;
+    }
+
+    private RowDataKeySelector getLeftKeySelector(RowType leftInputType) {
+        return KeySelectorUtil.getRowDataSelector(
+                joinSpec.getLeftKeys(), InternalTypeInfo.of(leftInputType));
+    }
+
+    private RowDataKeySelector getRightKeySelector(RowType rightInputType) {
+        return KeySelectorUtil.getRowDataSelector(
+                joinSpec.getRightKeys(), InternalTypeInfo.of(rightInputType));
+    }
+
+    private TwoInputStreamOperator<RowData, RowData, RowData> getJoinOperator(
+            TableConfig config, RowType leftInputType, RowType rightInputType) {
+
+        // input must not be nullable, because the runtime join function will make sure
+        // the code-generated function won't process null inputs
+        final CodeGeneratorContext ctx = new CodeGeneratorContext(config);
+        final ExprCodeGenerator exprGenerator =
+                new ExprCodeGenerator(ctx, false)
+                        .bindInput(
+                                leftInputType, CodeGenUtils.DEFAULT_INPUT1_TERM(), Option.empty())

Review comment:
       nit: we can use `JavaScalaConversionUtil` to avoid import scala class

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator;
+import org.apache.flink.table.planner.codegen.FunctionCodeGenerator;
+import org.apache.flink.table.planner.codegen.GeneratedExpression;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+/**
+ * {@link StreamExecNode} for temporal table join (FOR SYSTEM_TIME AS OF) and temporal TableFunction
+ * join (LATERAL TemporalTableFunction(o.proctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table join, the only
+ * difference is the validation, we reuse most same logic here.
+ */
+public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final JoinSpec joinSpec;
+    private final boolean isTemporalFunctionJoin;
+    private final int leftTimeAttributeIndex;
+    private final Optional<Integer> rightTimeAttributeIndex;
+
+    public StreamExecTemporalJoin(
+            JoinSpec joinSpec,
+            boolean isTemporalTableFunctionJoin,
+            int leftTimeAttributeIndex,
+            Optional<Integer> rightTimeAttributeIndex,
+            ExecEdge leftEdge,
+            ExecEdge rightEdge,
+            RowType outputType,
+            String description) {
+        super(Lists.newArrayList(leftEdge, rightEdge), outputType, description);
+        this.joinSpec = joinSpec;
+        this.isTemporalFunctionJoin = isTemporalTableFunctionJoin;
+        this.leftTimeAttributeIndex = leftTimeAttributeIndex;
+        this.rightTimeAttributeIndex = rightTimeAttributeIndex;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+
+        ExecNode<RowData> leftInput = (ExecNode<RowData>) getInputNodes().get(0);
+        ExecNode<RowData> rightInput = (ExecNode<RowData>) getInputNodes().get(1);
+        RowType leftInputType = (RowType) leftInput.getOutputType();
+        RowType rightInputType = (RowType) rightInput.getOutputType();
+
+        JoinUtil.validateJoinSpec(joinSpec, leftInputType, rightInputType, true);
+
+        FlinkJoinType joinType = joinSpec.getJoinType();
+        if (isTemporalFunctionJoin) {
+            if (joinType != FlinkJoinType.INNER) {
+                throw new ValidationException(
+                        "Temporal table function join currently only support INNER JOIN, "
+                                + "but was "
+                                + joinType
+                                + " JOIN.");
+            }
+        } else {
+            if (joinType != FlinkJoinType.LEFT && joinType != FlinkJoinType.INNER) {
+                throw new TableException(
+                        "Temporal table join currently only support INNER JOIN and LEFT JOIN, "
+                                + "but was "
+                                + joinType
+                                + " JOIN.");
+            }
+        }
+
+        RowType returnType = (RowType) getOutputType();
+
+        TwoInputStreamOperator<RowData, RowData, RowData> joinOperator =
+                getJoinOperator(planner.getTableConfig(), leftInputType, rightInputType);
+        Transformation<RowData> leftTransform = leftInput.translateToPlan(planner);
+        Transformation<RowData> rightTransform = rightInput.translateToPlan(planner);
+
+        TwoInputTransformation<RowData, RowData, RowData> ret =
+                new TwoInputTransformation<>(
+                        leftTransform,
+                        rightTransform,
+                        getDesc(),
+                        joinOperator,
+                        InternalTypeInfo.of(returnType),
+                        leftTransform.getParallelism());
+
+        if (inputsContainSingleton()) {
+            ret.setParallelism(1);
+            ret.setMaxParallelism(1);
+        }
+
+        // set KeyType and Selector for state
+        RowDataKeySelector leftKeySelector = getLeftKeySelector(leftInputType);
+        RowDataKeySelector rightKeySelector = getRightKeySelector(rightInputType);
+        ret.setStateKeySelectors(leftKeySelector, rightKeySelector);
+        LogicalType[] keyTypes =
+                IntStream.of(joinSpec.getLeftKeys())
+                        .mapToObj(idx -> leftInputType.getTypeAt(idx))

Review comment:
       nit: use `leftInputType::getTypeAt` to make IDE happy.

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
##########
@@ -20,68 +20,54 @@ package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
-import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.types.logical.{LogicalType, RowType}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.{Join, JoinInfo}
-import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.rex.RexNode
 import org.apache.calcite.util.ImmutableIntList
-import org.apache.calcite.util.mapping.IntPair
 
 import java.util
 
-import scala.collection.mutable
+import org.apache.flink.table.runtime.types.PlannerTypeUtils
 
 /**
   * Util for [[Join]]s.
   */
 object JoinUtil {
 
   /**
-    * Check and get join left and right keys.
-    */
-  def checkAndGetJoinKeys(
-      keyPairs: List[IntPair],
-      left: RelNode,
-      right: RelNode,
-      allowEmptyKey: Boolean = false): (Array[Int], Array[Int]) = {
-    // get the equality keys
-    val leftKeys = mutable.ArrayBuffer.empty[Int]
-    val rightKeys = mutable.ArrayBuffer.empty[Int]
-    if (keyPairs.isEmpty) {
-      if (allowEmptyKey) {
-        (leftKeys.toArray, rightKeys.toArray)
-      } else {
+   * Validates that join keys in [[JoinSpec]] is compatible in both sides of join.
+   */
+  def validateJoinSpec(
+      joinSpec: JoinSpec,
+      leftType: RowType,
+      rightType: RowType,
+      allowEmptyKey: Boolean = false): Unit = {
+    if (joinSpec.getLeftKeys.isEmpty || !allowEmptyKey) {
         throw new TableException(
           s"Joins should have at least one equality condition.\n" +
-            s"\tleft: ${left.toString}\n\tright: ${right.toString}\n" +
+            s"\tleft: ${leftType}\n\tright: ${rightType}\n" +
             s"please re-check the join statement and make sure there's " +
             "equality condition for join.")
-      }
-    } else {
-      // at least one equality expression
-      val leftFields = left.getRowType.getFieldList
-      val rightFields = right.getRowType.getFieldList
+    }
 
-      keyPairs.foreach { pair =>
-        val leftKeyType = leftFields.get(pair.source).getType.getSqlTypeName
-        val rightKeyType = rightFields.get(pair.target).getType.getSqlTypeName
+    val leftKeys = joinSpec.getLeftKeys
+    val rightKeys = joinSpec.getRightKeys
+    (0 until joinSpec.getJoinKeySize).foreach { idx =>
+        val leftKeyType = leftType.getTypeAt(leftKeys(idx))
+        val rightKeyType = rightType.getTypeAt(rightKeys(idx))
 
         // check if keys are compatible
-        if (leftKeyType == rightKeyType) {
-          // add key pair
-          leftKeys += pair.source
-          rightKeys += pair.target
-        } else {
+        if (!PlannerTypeUtils.isInteroperable(leftKeyType, rightKeyType)) {
           throw new TableException(
             s"Join: Equality join predicate on incompatible types. " +
-              s"\tLeft: ${left.toString}\n\tright: ${right.toString}\n" +
+              s"\tLeft: ${leftType}\n\tright: ${rightType}\n" +

Review comment:
       nit: ditto

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
##########
@@ -20,19 +20,19 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
 
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
 import org.apache.flink.table.planner.plan.nodes.logical._
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin
 import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.JoinRelType
 import org.apache.flink.util.Preconditions.checkState
 
 /**
- * Rule that matches a temporal join node and converts it to [[StreamExecTemporalJoin]],
+ * Rule that matches a temporal join node and converts it to [[StreamPhysicalTemporalJoin]],

Review comment:
       nit: `[[TemporalJoinCondition]]` is illegal link

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalJoin.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGenUtils;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ExprCodeGenerator;
+import org.apache.flink.table.planner.codegen.FunctionCodeGenerator;
+import org.apache.flink.table.planner.codegen.GeneratedExpression;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator;
+import org.apache.flink.table.runtime.operators.join.temporal.TemporalRowTimeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import scala.Option;
+
+/**
+ * {@link StreamExecNode} for temporal table join (FOR SYSTEM_TIME AS OF) and temporal TableFunction
+ * join (LATERAL TemporalTableFunction(o.proctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table join, the only
+ * difference is the validation, we reuse most same logic here.
+ */
+public class StreamExecTemporalJoin extends ExecNodeBase<RowData>
+        implements StreamExecNode<RowData> {
+
+    private final JoinSpec joinSpec;
+    private final boolean isTemporalFunctionJoin;
+    private final int leftTimeAttributeIndex;
+    private final Optional<Integer> rightTimeAttributeIndex;
+
+    public StreamExecTemporalJoin(
+            JoinSpec joinSpec,
+            boolean isTemporalTableFunctionJoin,
+            int leftTimeAttributeIndex,
+            Optional<Integer> rightTimeAttributeIndex,
+            ExecEdge leftEdge,
+            ExecEdge rightEdge,
+            RowType outputType,
+            String description) {
+        super(Lists.newArrayList(leftEdge, rightEdge), outputType, description);
+        this.joinSpec = joinSpec;
+        this.isTemporalFunctionJoin = isTemporalTableFunctionJoin;
+        this.leftTimeAttributeIndex = leftTimeAttributeIndex;
+        this.rightTimeAttributeIndex = rightTimeAttributeIndex;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+
+        ExecNode<RowData> leftInput = (ExecNode<RowData>) getInputNodes().get(0);
+        ExecNode<RowData> rightInput = (ExecNode<RowData>) getInputNodes().get(1);
+        RowType leftInputType = (RowType) leftInput.getOutputType();
+        RowType rightInputType = (RowType) rightInput.getOutputType();
+
+        JoinUtil.validateJoinSpec(joinSpec, leftInputType, rightInputType, true);
+
+        FlinkJoinType joinType = joinSpec.getJoinType();
+        if (isTemporalFunctionJoin) {
+            if (joinType != FlinkJoinType.INNER) {
+                throw new ValidationException(
+                        "Temporal table function join currently only support INNER JOIN, "
+                                + "but was "
+                                + joinType
+                                + " JOIN.");
+            }
+        } else {
+            if (joinType != FlinkJoinType.LEFT && joinType != FlinkJoinType.INNER) {
+                throw new TableException(
+                        "Temporal table join currently only support INNER JOIN and LEFT JOIN, "
+                                + "but was "
+                                + joinType
+                                + " JOIN.");

Review comment:
       ditto

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -84,14 +70,7 @@ public BatchExecHashJoin(
             RowType outputType,
             String description) {
         super(inputEdges, outputType, description);

Review comment:
       give `leftEdge` and `rightEdge` instead of `inputEdges ` like StreamExecTemporalJoin ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
##########
@@ -20,68 +20,54 @@ package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.api.{TableConfig, TableException}
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator}
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition
-import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.types.logical.{LogicalType, RowType}
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.{Join, JoinInfo}
-import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.rex.RexNode
 import org.apache.calcite.util.ImmutableIntList
-import org.apache.calcite.util.mapping.IntPair
 
 import java.util
 
-import scala.collection.mutable
+import org.apache.flink.table.runtime.types.PlannerTypeUtils
 
 /**
   * Util for [[Join]]s.
   */
 object JoinUtil {
 
   /**
-    * Check and get join left and right keys.
-    */
-  def checkAndGetJoinKeys(
-      keyPairs: List[IntPair],
-      left: RelNode,
-      right: RelNode,
-      allowEmptyKey: Boolean = false): (Array[Int], Array[Int]) = {
-    // get the equality keys
-    val leftKeys = mutable.ArrayBuffer.empty[Int]
-    val rightKeys = mutable.ArrayBuffer.empty[Int]
-    if (keyPairs.isEmpty) {
-      if (allowEmptyKey) {
-        (leftKeys.toArray, rightKeys.toArray)
-      } else {
+   * Validates that join keys in [[JoinSpec]] is compatible in both sides of join.
+   */
+  def validateJoinSpec(
+      joinSpec: JoinSpec,
+      leftType: RowType,
+      rightType: RowType,
+      allowEmptyKey: Boolean = false): Unit = {
+    if (joinSpec.getLeftKeys.isEmpty || !allowEmptyKey) {
         throw new TableException(
           s"Joins should have at least one equality condition.\n" +
-            s"\tleft: ${left.toString}\n\tright: ${right.toString}\n" +
+            s"\tleft: ${leftType}\n\tright: ${rightType}\n" +

Review comment:
       nit: braces are unnecessary.

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalTemporalJoinRule.scala
##########
@@ -20,19 +20,19 @@ package org.apache.flink.table.planner.plan.rules.physical.stream
 
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
 import org.apache.flink.table.planner.plan.nodes.logical._
-import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin
+import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTemporalJoin
 import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.JoinRelType
 import org.apache.flink.util.Preconditions.checkState
 
 /**
- * Rule that matches a temporal join node and converts it to [[StreamExecTemporalJoin]],
+ * Rule that matches a temporal join node and converts it to [[StreamPhysicalTemporalJoin]],
  * the temporal join node is a [[FlinkLogicalJoin]] which contains [[TemporalJoinCondition]].
  */
-class StreamExecTemporalJoinRule
-  extends StreamExecJoinRuleBase("StreamExecJoinRuleBase") {
+class StreamPhysicalTemporalJoinRule
+  extends StreamPhysicalJoinRuleBase("StreamExecJoinRuleBase") {

Review comment:
       change the description to `StreamPhysicalTemporalJoinRule`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -84,14 +70,7 @@ public BatchExecHashJoin(
             RowType outputType,
             String description) {
         super(inputEdges, outputType, description);
-        this.joinType = checkNotNull(joinType);
-        this.leftKeys = checkNotNull(leftKeys);
-        this.rightKeys = checkNotNull(rightKeys);
-        this.filterNulls = checkNotNull(filterNulls);
-        checkArgument(leftKeys.length > 0 && leftKeys.length == rightKeys.length);

Review comment:
       This join keys should not be empty. using `JoinUtil.validateJoinSpec` to validate the given joinSpec ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalTemporalJoin.scala
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalJoin
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin
+import org.apache.flink.table.planner.plan.nodes.exec.utils.JoinSpec
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil.{TEMPORAL_JOIN_CONDITION, TEMPORAL_JOIN_CONDITION_PRIMARY_KEY}
+import org.apache.flink.table.planner.plan.utils.TemporalJoinUtil
+import org.apache.flink.util.Preconditions.checkState
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.core.{Join, JoinRelType}
+import org.apache.calcite.rex._
+
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+/**
+ * Stream physical node for temporal table join (FOR SYSTEM_TIME AS OF) and
+ * temporal TableFunction join (LATERAL TemporalTableFunction(oproctime)).
+ *
+ * <p>The legacy temporal table function join is the subset of temporal table join,
+ * the only difference is the validation, we reuse most same logic here.
+ */
+class StreamPhysicalTemporalJoin(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    leftRel: RelNode,
+    rightRel: RelNode,
+    condition: RexNode,
+    joinType: JoinRelType)
+  extends CommonPhysicalJoin(cluster, traitSet, leftRel, rightRel, condition, joinType)
+  with StreamPhysicalRel {
+
+  override def requireWatermark: Boolean = {
+    TemporalJoinUtil.isRowTimeJoin(joinSpec)
+  }
+
+  override def copy(
+      traitSet: RelTraitSet,
+      conditionExpr: RexNode,
+      left: RelNode,
+      right: RelNode,
+      joinType: JoinRelType,
+      semiJoinDone: Boolean): Join = {
+    new StreamPhysicalTemporalJoin(
+      cluster,
+      traitSet,
+      left,
+      right,
+      conditionExpr,
+      joinType)
+  }
+
+  override def translateToExecNode(): ExecNode[_] = {
+    val textualRepresentation = this.toString
+    val rexBuilder = cluster.getRexBuilder
+    val isTemporalFunctionJoin =
+        TemporalJoinUtil.isTemporalFunctionJoin(rexBuilder, joinInfo)
+
+    val leftFieldCount = getLeft.getRowType.getFieldCount
+    val temporalJoinConditionExtractor = new TemporalJoinConditionExtractor(
+      textualRepresentation,
+      leftFieldCount,
+      joinSpec,
+      cluster.getRexBuilder,
+      isTemporalFunctionJoin)
+    val remainingNonEquiJoinPredicates =
+      temporalJoinConditionExtractor.apply(joinSpec.getNonEquiConditions)
+    val temporalJoinSpec = new JoinSpec(
+      joinSpec.getJoinType,
+      joinSpec.getLeftKeys,
+      joinSpec.getRightKeys,
+      joinSpec.getFilterNulls,
+      remainingNonEquiJoinPredicates)
+
+    val (leftTimeAttributeInputRef, rightRowTimeAttributeInputRef: Optional[Integer]) =
+      if (TemporalJoinUtil.isRowTimeJoin(joinSpec)) {
+        checkState(temporalJoinConditionExtractor.leftTimeAttribute.isDefined &&
+          temporalJoinConditionExtractor.rightPrimaryKey.isDefined,
+          "Missing %s in Event-Time temporal join condition", TEMPORAL_JOIN_CONDITION)
+
+        val leftTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, textualRepresentation)
+        val rightTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.rightTimeAttribute.get, textualRepresentation)
+        val rightInputRef = rightTimeAttributeInputRef - leftFieldCount
+
+        (leftTimeAttributeInputRef, Optional.of(new Integer(rightInputRef)))
+      } else {
+        val leftTimeAttributeInputRef = TemporalJoinUtil.extractInputRef(
+          temporalJoinConditionExtractor.leftTimeAttribute.get, textualRepresentation)
+        // right time attribute defined in temporal join condition iff in Event time join
+        (leftTimeAttributeInputRef, Optional.empty().asInstanceOf[Optional[Integer]])
+      }
+
+    new StreamExecTemporalJoin(
+      temporalJoinSpec,
+      isTemporalFunctionJoin,
+      leftTimeAttributeInputRef,
+      rightRowTimeAttributeInputRef,
+      ExecEdge.DEFAULT,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription)
+  }
+
+  /**
+   * TemporalJoinConditionExtractor extracts TEMPORAL_JOIN_CONDITION from non-equi join conditions.
+   *
+   * <p>TimeAttributes of both sides and primary keys of right side will be extracted and
+   * the TEMPORAL_JOIN_CONDITION RexCall will be pruned after extraction. </p>
+   */
+  private class TemporalJoinConditionExtractor(

Review comment:
       this class also extracts `TEMPORAL_JOIN_CONDITION_PRIMARY_KEY`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/JoinSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.utils;
+
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** JoinSpec describes how two table will be joined. */
+public class JoinSpec {
+    /** Type of the join. */
+    private final FlinkJoinType joinType;
+    /** 0-based index of join keys in left side. */
+    private final int[] leftKeys;
+    /** 0-based index of join keys in right side. */
+    private final int[] rightKeys;
+    /** whether to filter null values or not. */
+    private final boolean[] filterNulls;
+    /** Non Equi join conditions. */
+    private final @Nullable RexNode nonEquiConditions;
+
+    public JoinSpec(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            @Nullable RexNode nonEquiConditions) {
+        this.joinType = Preconditions.checkNotNull(joinType);
+        this.leftKeys = Preconditions.checkNotNull(leftKeys);
+        this.rightKeys = Preconditions.checkNotNull(rightKeys);
+        this.filterNulls = Preconditions.checkNotNull(filterNulls);
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+
+        this.nonEquiConditions = nonEquiConditions;
+    }
+
+    public FlinkJoinType getJoinType() {
+        return joinType;
+    }
+
+    public int[] getLeftKeys() {
+        return leftKeys;
+    }
+
+    public int[] getRightKeys() {
+        return rightKeys;
+    }
+
+    public boolean[] getFilterNulls() {
+        return filterNulls;
+    }
+
+    @Nullable
+    public RexNode getNonEquiConditions() {
+        return nonEquiConditions;
+    }
+
+    /** Gets number of keys in join key. */
+    public int getJoinKeySize() {
+        return leftKeys.length;
+    }
+
+    /** Creates a JoinSpecBuilder. */
+    public static JoinSpecBuilder builder() {
+        return new JoinSpecBuilder();
+    }
+
+    /** Utils class to build a {@link JoinSpec}. */
+    public static class JoinSpecBuilder {
+        private FlinkJoinType joinType;
+        private final List<Integer> leftKeys = new ArrayList<>();
+        private final List<Integer> rightKeys = new ArrayList<>();
+        private final List<Boolean> filterNulls = new ArrayList<>();
+        private @Nullable RexNode nonEquiConditions;
+
+        public JoinSpecBuilder joinType(FlinkJoinType joinType) {
+            this.joinType = joinType;
+            return this;
+        }
+
+        /** Parse join info from given rel nodes and condition. */
+        public JoinSpecBuilder parseCondition(
+                RexBuilder rexBuilder, RelNode left, RelNode right, RexNode condition) {

Review comment:
       `RexBuilder rexBuilder` is unnecessary, we can get it from `left.getCluster().getRexBuilder()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org