You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/29 09:57:46 UTC

[GitHub] [flink] wenlong88 opened a new pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

wenlong88 opened a new pull request #14516:
URL: https://github.com/apache/flink/pull/14516


   ## What is the purpose of the change
   Separate implementation of HashJoin/NestedLoopJoin/SortMergeJoin in batch
   
   ## Brief change log
   1. Introduce BatchPhysicalHashJoin, and make BatchExecHashJoin only extended from ExecNode
   2. Introduce BatchPhysicalNestedLoopJoin, and make BatchExecNestedLoopJoin only extended from ExecNode
   3. Introduce BatchPhysicalSortMergeJoin, and make BatchExecSortMergeJoin only extended from ExecNode
   
   ## Verifying this change
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wenlong88 commented on a change in pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
wenlong88 commented on a change in pull request #14516:
URL: https://github.com/apache/flink/pull/14516#discussion_r552521445



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;
+    private final boolean tryDistinctBuildRow;
+
+    public BatchExecHashJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            int leftAvgRowSize,
+            int rightAvgRowSize,
+            long leftRowCount,
+            long rightRowCount,
+            boolean leftIsBuild,
+            boolean tryDistinctBuildRow,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.filterNulls = filterNulls;
+        this.nonEquiConditions = nonEquiConditions;
+        this.leftIsBuild = leftIsBuild;
+        this.leftAvgRowSize = leftAvgRowSize;
+        this.rightAvgRowSize = rightAvgRowSize;
+        this.leftRowCount = leftRowCount;
+        this.rightRowCount = rightRowCount;
+        this.tryDistinctBuildRow = tryDistinctBuildRow;
+    }
+
+    @Override
+    @SuppressWarnings("Unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        TableConfig config = planner.getTableConfig();
+
+        Transformation<RowData> lInput =
+                (Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
+        Transformation<RowData> rInput =
+                (Transformation<RowData>) getInputNodes().get(1).translateToPlan(planner);
+
+        // get type
+        RowType lType = (RowType) getInputNodes().get(0).getOutputType();
+        RowType rType = (RowType) getInputNodes().get(1).getOutputType();
+
+        LogicalType[] keyFieldTypes =
+                IntStream.of(leftKeys).mapToObj(lType::getTypeAt).toArray(LogicalType[]::new);
+        RowType keyType = RowType.of(keyFieldTypes);
+
+        GeneratedJoinCondition condFunc =
+                JoinUtil.generateConditionFunction(config, nonEquiConditions, lType, rType);
+
+        // projection for equals
+        GeneratedProjection lProj =
+                ProjectionCodeGenerator.generateProjection(
+                        new CodeGeneratorContext(config),
+                        "HashJoinLeftProjection",
+                        lType,
+                        keyType,
+                        leftKeys);
+        GeneratedProjection rProj =
+                ProjectionCodeGenerator.generateProjection(
+                        new CodeGeneratorContext(config),
+                        "HashJoinRightProjection",
+                        rType,
+                        keyType,
+                        rightKeys);
+
+        Transformation<RowData> build;
+        Transformation<RowData> probe;
+        GeneratedProjection bProj;
+        GeneratedProjection pProj;
+        int[] bKeys;
+        int[] pKeys;
+        RowType bType;
+        RowType pType;
+        int bRowSize;
+        long bRowCount;
+        long pRowCount;
+        boolean reverseJoin = !leftIsBuild;
+        if (leftIsBuild) {
+            build = lInput;
+            bProj = lProj;
+            bType = lType;
+            bRowSize = leftAvgRowSize;
+            bRowCount = leftRowCount;
+            bKeys = leftKeys;
+
+            probe = rInput;
+            pProj = rProj;
+            pType = rType;
+            pRowCount = leftRowCount;
+            pKeys = rightKeys;
+        } else {
+            build = rInput;
+            bProj = rProj;
+            bType = rType;
+            bRowSize = rightAvgRowSize;
+            bRowCount = rightRowCount;
+            bKeys = rightKeys;
+
+            probe = lInput;
+            pProj = lProj;
+            pType = lType;
+            pRowCount = leftRowCount;
+            pKeys = leftKeys;
+        }

Review comment:
       the flag leftIsBuild cannot be reduced, so there is no need to specified build/probe side in the constructor, it is better to left them inside the ExecNode.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 09d3419bc7ef88969ad62f2d19a66e586610bdcf Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-757177059


   Thanks for the update. LGTM overall, one minor comment: we could also use SortSpec in `ComparatorCodeGenerator`, and please fix the compile error


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b87220ccdee330380bc015427b9f4b445173e324 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721) 
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * dc919c934588343a3a4cbfb55e75f0f7fe325a58 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b87220ccdee330380bc015427b9f4b445173e324 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721) 
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe closed pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #14516:
URL: https://github.com/apache/flink/pull/14516


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813",
       "triggerID" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11868",
       "triggerID" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5789214ded1d1654cdeb384aaad5afa4337ac3d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5789214ded1d1654cdeb384aaad5afa4337ac3d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 0ca863216b6a5d5e54f833bacf6eb5c794bb142d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11868) 
   * 5789214ded1d1654cdeb384aaad5afa4337ac3d8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-757177059


   Thanks for the update. LGTM overall, one minor comment: we could also use SortSpec in `ComparatorCodeGenerator`, and please fix the compile error


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813",
       "triggerID" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 6dd3de68f91d979bdb4288f501fc703fe4317ce5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790) 
   * 7422b497591e2888b6ec9cc4136d5dffa46b550d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813",
       "triggerID" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11868",
       "triggerID" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 7422b497591e2888b6ec9cc4136d5dffa46b550d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813) 
   * 0ca863216b6a5d5e54f833bacf6eb5c794bb142d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11868) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14516:
URL: https://github.com/apache/flink/pull/14516#discussion_r553721184



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import scala.Tuple3;
+
+/** Batch {@link ExecNode} for Sort Merge Join. */
+public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsSmaller;
+
+    public BatchExecSortMergeJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            boolean leftIsSmaller,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.filterNulls = filterNulls;
+        this.nonEquiConditions = nonEquiConditions;
+        this.leftIsSmaller = leftIsSmaller;
+    }
+
+    @Override
+    @SuppressWarnings("Unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        TableConfig config = planner.getTableConfig();
+
+        Transformation<RowData> lInput =
+                (Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
+        Transformation<RowData> rInput =
+                (Transformation<RowData>) getInputNodes().get(1).translateToPlan(planner);
+
+        // get type
+        RowType lType = (RowType) getInputNodes().get(0).getOutputType();
+        RowType rType = (RowType) getInputNodes().get(1).getOutputType();
+
+        LogicalType[] keyFieldTypes =
+                IntStream.of(leftKeys).mapToObj(lType::getTypeAt).toArray(LogicalType[]::new);
+        RowType keyType = RowType.of(keyFieldTypes);
+
+        GeneratedJoinCondition condFunc =
+                JoinUtil.generateConditionFunction(config, nonEquiConditions, lType, rType);
+
+        long externalBufferMemory =
+                MemorySize.parse(
+                                config.getConfiguration()
+                                        .getString(
+                                                ExecutionConfigOptions
+                                                        .TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY))
+                        .getBytes();
+        long sortMemory =
+                MemorySize.parse(
+                                config.getConfiguration()
+                                        .getString(
+                                                ExecutionConfigOptions
+                                                        .TABLE_EXEC_RESOURCE_SORT_MEMORY))
+                        .getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }
+
+        long managedMemory = externalBufferMemory * externalBufferNum + sortMemory * 2;
+
+        SortCodeGenerator leftSortGen = newSortGen(config, leftKeys, lType);
+        SortCodeGenerator rightSortGen = newSortGen(config, rightKeys, rType);
+
+        int[] keyPositions = IntStream.range(0, leftKeys.length).toArray();
+        SortMergeJoinOperator operator =
+                new SortMergeJoinOperator(
+                        1.0 * externalBufferMemory / managedMemory,
+                        joinType,
+                        leftIsSmaller,
+                        condFunc,
+                        ProjectionCodeGenerator.generateProjection(
+                                new CodeGeneratorContext(config),
+                                "SMJProjection",
+                                lType,
+                                keyType,
+                                leftKeys),
+                        ProjectionCodeGenerator.generateProjection(
+                                new CodeGeneratorContext(config),
+                                "SMJProjection",
+                                rType,
+                                keyType,
+                                rightKeys),
+                        leftSortGen.generateNormalizedKeyComputer("LeftComputer"),
+                        leftSortGen.generateRecordComparator("LeftComparator"),
+                        rightSortGen.generateNormalizedKeyComputer("RightComputer"),
+                        rightSortGen.generateRecordComparator("RightComparator"),
+                        newSortGen(config, keyPositions, keyType)
+                                .generateRecordComparator("KeyComparator"),
+                        filterNulls);
+
+        TwoInputTransformation<RowData, RowData, RowData> ret =
+                ExecNodeUtil.createTwoInputTransformation(
+                        lInput,
+                        rInput,
+                        getDesc(),
+                        SimpleOperatorFactory.of(operator),
+                        InternalTypeInfo.of(getOutputType()),
+                        rInput.getParallelism(),
+                        managedMemory);
+
+        if (inputsContainSingleton()) {
+            ret.setParallelism(1);
+            ret.setMaxParallelism(1);
+        }
+        return ret;
+    }
+
+    private SortCodeGenerator newSortGen(TableConfig config, int[] originalKeys, RowType type) {
+        boolean[] originalOrders = new boolean[originalKeys.length];
+        Arrays.fill(originalOrders, true);
+        boolean[] nullsIsLast = SortUtil.getNullDefaultOrders(originalOrders);
+        Tuple3<int[], boolean[], boolean[]> ret =
+                SortUtil.deduplicateSortKeys(originalKeys, originalOrders, nullsIsLast);

Review comment:
       return `SortSpec` directly ? even we can use `SortSpec` in `SortCodeGenerator` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752019267


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 5789214ded1d1654cdeb384aaad5afa4337ac3d8 (Fri May 28 07:05:58 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 09d3419bc7ef88969ad62f2d19a66e586610bdcf Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442) 
   * 5601ca0f26b9fdb130b0a030e7326d8b1287c76c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14516:
URL: https://github.com/apache/flink/pull/14516#discussion_r553721508



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;
+    private final boolean tryDistinctBuildRow;
+
+    public BatchExecHashJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            int leftAvgRowSize,
+            int rightAvgRowSize,
+            long leftRowCount,
+            long rightRowCount,
+            boolean leftIsBuild,
+            boolean tryDistinctBuildRow,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);

Review comment:
       I think this validation is necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 09d3419bc7ef88969ad62f2d19a66e586610bdcf UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * dc919c934588343a3a4cbfb55e75f0f7fe325a58 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b87220ccdee330380bc015427b9f4b445173e324 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * dc919c934588343a3a4cbfb55e75f0f7fe325a58 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774) 
   * 6dd3de68f91d979bdb4288f501fc703fe4317ce5 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-758601295


   Thanks for the contribution, I will merge this pr 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813",
       "triggerID" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 7422b497591e2888b6ec9cc4136d5dffa46b550d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * dc919c934588343a3a4cbfb55e75f0f7fe325a58 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774) 
   * 6dd3de68f91d979bdb4288f501fc703fe4317ce5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 6dd3de68f91d979bdb4288f501fc703fe4317ce5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813",
       "triggerID" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 7422b497591e2888b6ec9cc4136d5dffa46b550d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813) 
   * 0ca863216b6a5d5e54f833bacf6eb5c794bb142d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 6dd3de68f91d979bdb4288f501fc703fe4317ce5 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790) 
   * 7422b497591e2888b6ec9cc4136d5dffa46b550d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5601ca0f26b9fdb130b0a030e7326d8b1287c76c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718) 
   * b87220ccdee330380bc015427b9f4b445173e324 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813",
       "triggerID" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11868",
       "triggerID" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 0ca863216b6a5d5e54f833bacf6eb5c794bb142d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11868) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14516:
URL: https://github.com/apache/flink/pull/14516#discussion_r555526415



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecRank.java
##########
@@ -121,16 +121,24 @@ public StreamExecRank(
         int[] sortFields = sortSpec.getFieldIndices();
         RowDataKeySelector sortKeySelector =
                 KeySelectorUtil.getRowDataSelector(sortFields, inputRowTypeInfo);
+        // create a sort spec on sort keys.
         int[] sortKeyPositions = IntStream.range(0, sortFields.length).toArray();
+        SortSpec.SortSpecBuilder builder = SortSpec.builder();
+        IntStream.range(0, sortFields.length)
+                .forEach(
+                        idx ->
+                                builder.addField(
+                                        idx,
+                                        sortSpec.getFieldSpec(idx).getIsAscendingOrder(),
+                                        sortSpec.getFieldSpec(idx).getNullIsLast()));
+        SortSpec sortSpecInSortKey = builder.build();

Review comment:
       add a `createSubSortSpec(int startIndex, int endIndex)` method in `SortSpec` and replace this code snippet with the new method ? 

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/sort/ComparatorCodeGenerator.scala
##########
@@ -36,25 +37,27 @@ object ComparatorCodeGenerator {
     * @param conf        Table config.
     * @param name        Class name of the function.
     *                    Does not need to be unique but has to be a valid Java class identifier.
-    * @param keys        key positions describe which fields are keys in what order.
-    * @param keyTypes    types for the key fields, in the same order as the key fields.
-    * @param orders      sorting orders for the key fields.
-    * @param nullsIsLast Ordering of nulls.
+    * @param input        input type.
+    * @param sortSpec     sort specification.
     * @return A GeneratedRecordComparator
     */
   def gen(
       conf: TableConfig,
       name: String,
-      keys: Array[Int],
-      keyTypes: Array[LogicalType],
-      orders: Array[Boolean],
-      nullsIsLast: Array[Boolean]): GeneratedRecordComparator = {
+      input: RowType,
+      sortSpec: SortSpec): GeneratedRecordComparator = {
     val className = newName(name)
     val baseClass = classOf[RecordComparator]
 
     val ctx = new CodeGeneratorContext(conf)
     val compareCode = GenerateUtils.generateRowCompare(
-      ctx, keys, keyTypes, orders, nullsIsLast, "o1", "o2")
+        ctx,
+        sortSpec.getFieldIndices,
+        sortSpec.getFieldTypes(input),
+        sortSpec.getAscendingOrders,
+        sortSpec.getNullsIsLast,

Review comment:
       replace the sort related parameters with `SortSpec` for `GenerateUtils.generateRowCompare` method ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14516:
URL: https://github.com/apache/flink/pull/14516#discussion_r551222946



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;

Review comment:
       add prefix `estimated` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;
+    private final boolean tryDistinctBuildRow;
+
+    public BatchExecHashJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            int leftAvgRowSize,
+            int rightAvgRowSize,
+            long leftRowCount,
+            long rightRowCount,
+            boolean leftIsBuild,
+            boolean tryDistinctBuildRow,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.filterNulls = filterNulls;
+        this.nonEquiConditions = nonEquiConditions;
+        this.leftIsBuild = leftIsBuild;
+        this.leftAvgRowSize = leftAvgRowSize;
+        this.rightAvgRowSize = rightAvgRowSize;
+        this.leftRowCount = leftRowCount;
+        this.rightRowCount = rightRowCount;
+        this.tryDistinctBuildRow = tryDistinctBuildRow;
+    }
+
+    @Override
+    @SuppressWarnings("Unchecked")

Review comment:
       typo: `Unchecked` -> `unchecked`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;
+    private final boolean tryDistinctBuildRow;
+
+    public BatchExecHashJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            int leftAvgRowSize,
+            int rightAvgRowSize,
+            long leftRowCount,
+            long rightRowCount,
+            boolean leftIsBuild,
+            boolean tryDistinctBuildRow,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,

Review comment:
       RowType

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;
+    private final boolean tryDistinctBuildRow;
+
+    public BatchExecHashJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,

Review comment:
       add `@Nullable` annotation

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashJoin.scala
##########
@@ -174,7 +163,37 @@ class BatchExecHashJoin(
 
   //~ ExecNode methods -----------------------------------------------------------
 
-  override def getInputEdges: util.List[ExecEdge] = {
+  override def translateToExecNode(): ExecNode[_] = {
+
+    val nonEquiPredicates = if (!joinInfo.isEqui) {
+      joinInfo.getRemaining(getCluster.getRexBuilder)
+    } else {
+      null
+    }
+    val mq = getCluster.getMetadataQuery
+    val leftRowSize = Util.first(mq.getAverageRowSize(left), 24).toInt
+    val leftRowCount = Util.first(mq.getRowCount(left), 200000).toLong
+    val rightRowSize = Util.first(mq.getAverageRowSize(right), 24).toInt
+    val rightRowCount = Util.first(mq.getRowCount(right), 200000).toLong
+    new BatchExecHashJoin(
+        JoinTypeUtil.getFlinkJoinType(joinType),
+        leftKeys,
+        rightKeys,
+        filterNulls,
+        nonEquiPredicates,
+        leftRowSize,
+        rightRowSize,
+        leftRowCount,
+        rightRowCount,
+        leftIsBuild,
+        tryDistinctBuildRow,
+        getInputEdges,
+        FlinkTypeFactory.toLogicalRowType(getRowType),
+        getRelDetailedDescription
+    )
+  }
+
+  def getInputEdges: util.List[ExecEdge] = {

Review comment:
       add `private`

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecNestedLoopJoin.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.NestedLoopJoinCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+
+/** Batch {@link ExecNode} for Nested-loop Join. */
+public class BatchExecNestedLoopJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final RexNode condition;
+    private final boolean leftIsBuild;
+    private final boolean singleRowJoin;
+
+    public BatchExecNestedLoopJoin(
+            FlinkJoinType joinType,
+            RexNode condition,
+            boolean leftIsBuild,
+            boolean singleRowJoin,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,

Review comment:
       RowType

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import scala.Tuple3;
+
+/** Batch {@link ExecNode} for Sort Merge Join. */
+public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsSmaller;
+
+    public BatchExecSortMergeJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            boolean leftIsSmaller,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.filterNulls = filterNulls;
+        this.nonEquiConditions = nonEquiConditions;
+        this.leftIsSmaller = leftIsSmaller;
+    }
+
+    @Override
+    @SuppressWarnings("Unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        TableConfig config = planner.getTableConfig();
+
+        Transformation<RowData> lInput =
+                (Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
+        Transformation<RowData> rInput =
+                (Transformation<RowData>) getInputNodes().get(1).translateToPlan(planner);
+
+        // get type
+        RowType lType = (RowType) getInputNodes().get(0).getOutputType();
+        RowType rType = (RowType) getInputNodes().get(1).getOutputType();
+
+        LogicalType[] keyFieldTypes =
+                IntStream.of(leftKeys).mapToObj(lType::getTypeAt).toArray(LogicalType[]::new);
+        RowType keyType = RowType.of(keyFieldTypes);
+
+        GeneratedJoinCondition condFunc =
+                JoinUtil.generateConditionFunction(config, nonEquiConditions, lType, rType);
+
+        long externalBufferMemory =
+                MemorySize.parse(
+                                config.getConfiguration()
+                                        .getString(
+                                                ExecutionConfigOptions
+                                                        .TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY))
+                        .getBytes();
+        long sortMemory =
+                MemorySize.parse(
+                                config.getConfiguration()
+                                        .getString(
+                                                ExecutionConfigOptions
+                                                        .TABLE_EXEC_RESOURCE_SORT_MEMORY))
+                        .getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }
+
+        long managedMemory = externalBufferMemory * externalBufferNum + sortMemory * 2;
+
+        SortCodeGenerator leftSortGen = newSortGen(config, leftKeys, lType);
+        SortCodeGenerator rightSortGen = newSortGen(config, rightKeys, rType);
+
+        int[] keyPositions = IntStream.range(0, leftKeys.length).toArray();
+        SortMergeJoinOperator operator =
+                new SortMergeJoinOperator(
+                        1.0 * externalBufferMemory / managedMemory,
+                        joinType,
+                        leftIsSmaller,
+                        condFunc,
+                        ProjectionCodeGenerator.generateProjection(
+                                new CodeGeneratorContext(config),
+                                "SMJProjection",
+                                lType,
+                                keyType,
+                                leftKeys),
+                        ProjectionCodeGenerator.generateProjection(
+                                new CodeGeneratorContext(config),
+                                "SMJProjection",
+                                rType,
+                                keyType,
+                                rightKeys),
+                        leftSortGen.generateNormalizedKeyComputer("LeftComputer"),
+                        leftSortGen.generateRecordComparator("LeftComparator"),
+                        rightSortGen.generateNormalizedKeyComputer("RightComputer"),
+                        rightSortGen.generateRecordComparator("RightComparator"),
+                        newSortGen(config, keyPositions, keyType)
+                                .generateRecordComparator("KeyComparator"),
+                        filterNulls);
+
+        TwoInputTransformation<RowData, RowData, RowData> ret =
+                ExecNodeUtil.createTwoInputTransformation(
+                        lInput,
+                        rInput,
+                        getDesc(),
+                        SimpleOperatorFactory.of(operator),
+                        InternalTypeInfo.of(getOutputType()),
+                        rInput.getParallelism(),
+                        managedMemory);
+
+        if (inputsContainSingleton()) {
+            ret.setParallelism(1);
+            ret.setMaxParallelism(1);
+        }
+        return ret;
+    }
+
+    private SortCodeGenerator newSortGen(TableConfig config, int[] originalKeys, RowType type) {
+        boolean[] originalOrders = new boolean[originalKeys.length];
+        Arrays.fill(originalOrders, true);
+        boolean[] nullsIsLast = SortUtil.getNullDefaultOrders(originalOrders);
+        Tuple3<int[], boolean[], boolean[]> ret =
+                SortUtil.deduplicateSortKeys(originalKeys, originalOrders, nullsIsLast);

Review comment:
       add some comments to explain what's each element in Tuple3?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;
+    private final boolean tryDistinctBuildRow;
+
+    public BatchExecHashJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            int leftAvgRowSize,
+            int rightAvgRowSize,
+            long leftRowCount,
+            long rightRowCount,
+            boolean leftIsBuild,
+            boolean tryDistinctBuildRow,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);

Review comment:
       `leftKeys` should be non-empty.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;
+    private final boolean tryDistinctBuildRow;
+
+    public BatchExecHashJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            int leftAvgRowSize,
+            int rightAvgRowSize,
+            long leftRowCount,
+            long rightRowCount,
+            boolean leftIsBuild,
+            boolean tryDistinctBuildRow,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.filterNulls = filterNulls;
+        this.nonEquiConditions = nonEquiConditions;
+        this.leftIsBuild = leftIsBuild;
+        this.leftAvgRowSize = leftAvgRowSize;
+        this.rightAvgRowSize = rightAvgRowSize;
+        this.leftRowCount = leftRowCount;
+        this.rightRowCount = rightRowCount;
+        this.tryDistinctBuildRow = tryDistinctBuildRow;
+    }
+
+    @Override
+    @SuppressWarnings("Unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        TableConfig config = planner.getTableConfig();
+
+        Transformation<RowData> lInput =
+                (Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
+        Transformation<RowData> rInput =
+                (Transformation<RowData>) getInputNodes().get(1).translateToPlan(planner);
+
+        // get type
+        RowType lType = (RowType) getInputNodes().get(0).getOutputType();
+        RowType rType = (RowType) getInputNodes().get(1).getOutputType();

Review comment:
       add left/right input node local field ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsBuild;
+    private final int leftAvgRowSize;
+    private final int rightAvgRowSize;
+    private final long leftRowCount;
+    private final long rightRowCount;
+    private final boolean tryDistinctBuildRow;
+
+    public BatchExecHashJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            int leftAvgRowSize,
+            int rightAvgRowSize,
+            long leftRowCount,
+            long rightRowCount,
+            boolean leftIsBuild,
+            boolean tryDistinctBuildRow,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.filterNulls = filterNulls;
+        this.nonEquiConditions = nonEquiConditions;
+        this.leftIsBuild = leftIsBuild;
+        this.leftAvgRowSize = leftAvgRowSize;
+        this.rightAvgRowSize = rightAvgRowSize;
+        this.leftRowCount = leftRowCount;
+        this.rightRowCount = rightRowCount;
+        this.tryDistinctBuildRow = tryDistinctBuildRow;
+    }
+
+    @Override
+    @SuppressWarnings("Unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        TableConfig config = planner.getTableConfig();
+
+        Transformation<RowData> lInput =
+                (Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
+        Transformation<RowData> rInput =
+                (Transformation<RowData>) getInputNodes().get(1).translateToPlan(planner);
+
+        // get type
+        RowType lType = (RowType) getInputNodes().get(0).getOutputType();
+        RowType rType = (RowType) getInputNodes().get(1).getOutputType();
+
+        LogicalType[] keyFieldTypes =
+                IntStream.of(leftKeys).mapToObj(lType::getTypeAt).toArray(LogicalType[]::new);
+        RowType keyType = RowType.of(keyFieldTypes);
+
+        GeneratedJoinCondition condFunc =
+                JoinUtil.generateConditionFunction(config, nonEquiConditions, lType, rType);
+
+        // projection for equals
+        GeneratedProjection lProj =
+                ProjectionCodeGenerator.generateProjection(
+                        new CodeGeneratorContext(config),
+                        "HashJoinLeftProjection",
+                        lType,
+                        keyType,
+                        leftKeys);
+        GeneratedProjection rProj =
+                ProjectionCodeGenerator.generateProjection(
+                        new CodeGeneratorContext(config),
+                        "HashJoinRightProjection",
+                        rType,
+                        keyType,
+                        rightKeys);
+
+        Transformation<RowData> build;
+        Transformation<RowData> probe;
+        GeneratedProjection bProj;
+        GeneratedProjection pProj;
+        int[] bKeys;
+        int[] pKeys;
+        RowType bType;
+        RowType pType;
+        int bRowSize;
+        long bRowCount;
+        long pRowCount;
+        boolean reverseJoin = !leftIsBuild;
+        if (leftIsBuild) {
+            build = lInput;
+            bProj = lProj;
+            bType = lType;
+            bRowSize = leftAvgRowSize;
+            bRowCount = leftRowCount;
+            bKeys = leftKeys;
+
+            probe = rInput;
+            pProj = rProj;
+            pType = rType;
+            pRowCount = leftRowCount;
+            pKeys = rightKeys;
+        } else {
+            build = rInput;
+            bProj = rProj;
+            bType = rType;
+            bRowSize = rightAvgRowSize;
+            bRowCount = rightRowCount;
+            bKeys = rightKeys;
+
+            probe = lInput;
+            pProj = lProj;
+            pType = lType;
+            pRowCount = leftRowCount;
+            pKeys = leftKeys;
+        }

Review comment:
       give build/probe fields directly in the constructor ?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashJoin.scala
##########
@@ -174,7 +163,37 @@ class BatchExecHashJoin(
 
   //~ ExecNode methods -----------------------------------------------------------

Review comment:
       remove this line

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashJoin.scala
##########
@@ -174,7 +163,37 @@ class BatchExecHashJoin(
 
   //~ ExecNode methods -----------------------------------------------------------
 
-  override def getInputEdges: util.List[ExecEdge] = {
+  override def translateToExecNode(): ExecNode[_] = {
+

Review comment:
       remove empty line

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.LongHashJoinGenerator;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.generated.GeneratedProjection;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
+import org.apache.flink.table.runtime.operators.join.HashJoinType;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Batch {@link ExecNode} for Hash Join. */
+public class BatchExecHashJoin extends ExecNodeBase<RowData> implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;

Review comment:
       add `@Nullable` annotation

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import scala.Tuple3;
+
+/** Batch {@link ExecNode} for Sort Merge Join. */
+public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsSmaller;
+
+    public BatchExecSortMergeJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            boolean leftIsSmaller,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,

Review comment:
       RowType

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import scala.Tuple3;
+
+/** Batch {@link ExecNode} for Sort Merge Join. */
+public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;

Review comment:
       add @Nullable annotation

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import scala.Tuple3;
+
+/** Batch {@link ExecNode} for Sort Merge Join. */
+public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsSmaller;
+
+    public BatchExecSortMergeJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            boolean leftIsSmaller,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.filterNulls = filterNulls;
+        this.nonEquiConditions = nonEquiConditions;
+        this.leftIsSmaller = leftIsSmaller;
+    }
+
+    @Override
+    @SuppressWarnings("Unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        TableConfig config = planner.getTableConfig();
+
+        Transformation<RowData> lInput =
+                (Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
+        Transformation<RowData> rInput =
+                (Transformation<RowData>) getInputNodes().get(1).translateToPlan(planner);
+
+        // get type
+        RowType lType = (RowType) getInputNodes().get(0).getOutputType();
+        RowType rType = (RowType) getInputNodes().get(1).getOutputType();
+
+        LogicalType[] keyFieldTypes =
+                IntStream.of(leftKeys).mapToObj(lType::getTypeAt).toArray(LogicalType[]::new);
+        RowType keyType = RowType.of(keyFieldTypes);
+
+        GeneratedJoinCondition condFunc =
+                JoinUtil.generateConditionFunction(config, nonEquiConditions, lType, rType);
+
+        long externalBufferMemory =
+                MemorySize.parse(
+                                config.getConfiguration()
+                                        .getString(
+                                                ExecutionConfigOptions
+                                                        .TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY))
+                        .getBytes();
+        long sortMemory =
+                MemorySize.parse(
+                                config.getConfiguration()
+                                        .getString(
+                                                ExecutionConfigOptions
+                                                        .TABLE_EXEC_RESOURCE_SORT_MEMORY))
+                        .getBytes();

Review comment:
       extract an utility method ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import scala.Tuple3;
+
+/** Batch {@link ExecNode} for Sort Merge Join. */
+public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsSmaller;
+
+    public BatchExecSortMergeJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,

Review comment:
       add @Nullable annotation

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import scala.Tuple3;
+
+/** Batch {@link ExecNode} for Sort Merge Join. */
+public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsSmaller;
+
+    public BatchExecSortMergeJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            boolean leftIsSmaller,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);
+        Preconditions.checkArgument(leftKeys.length == filterNulls.length);
+        this.leftKeys = leftKeys;
+        this.rightKeys = rightKeys;
+        this.filterNulls = filterNulls;
+        this.nonEquiConditions = nonEquiConditions;
+        this.leftIsSmaller = leftIsSmaller;
+    }
+
+    @Override
+    @SuppressWarnings("Unchecked")
+    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
+        TableConfig config = planner.getTableConfig();
+
+        Transformation<RowData> lInput =
+                (Transformation<RowData>) getInputNodes().get(0).translateToPlan(planner);
+        Transformation<RowData> rInput =
+                (Transformation<RowData>) getInputNodes().get(1).translateToPlan(planner);
+
+        // get type
+        RowType lType = (RowType) getInputNodes().get(0).getOutputType();
+        RowType rType = (RowType) getInputNodes().get(1).getOutputType();
+

Review comment:
       add left/right input node local variable ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSortMergeJoin.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+import org.apache.flink.table.planner.plan.utils.SortUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rex.RexNode;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import scala.Tuple3;
+
+/** Batch {@link ExecNode} for Sort Merge Join. */
+public class BatchExecSortMergeJoin extends ExecNodeBase<RowData>
+        implements BatchExecNode<RowData> {
+
+    private final FlinkJoinType joinType;
+    private final int[] leftKeys;
+    private final int[] rightKeys;
+    private final boolean[] filterNulls;
+    private final RexNode nonEquiConditions;
+    private final boolean leftIsSmaller;
+
+    public BatchExecSortMergeJoin(
+            FlinkJoinType joinType,
+            int[] leftKeys,
+            int[] rightKeys,
+            boolean[] filterNulls,
+            RexNode nonEquiConditions,
+            boolean leftIsSmaller,
+            List<ExecEdge> inputEdges,
+            LogicalType outputType,
+            String description) {
+        super(inputEdges, outputType, description);
+        this.joinType = joinType;
+
+        Preconditions.checkArgument(leftKeys.length == rightKeys.length);

Review comment:
       `leftKeys` should not be empty




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11790",
       "triggerID" : "6dd3de68f91d979bdb4288f501fc703fe4317ce5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11813",
       "triggerID" : "7422b497591e2888b6ec9cc4136d5dffa46b550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11868",
       "triggerID" : "0ca863216b6a5d5e54f833bacf6eb5c794bb142d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5789214ded1d1654cdeb384aaad5afa4337ac3d8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11908",
       "triggerID" : "5789214ded1d1654cdeb384aaad5afa4337ac3d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * 0ca863216b6a5d5e54f833bacf6eb5c794bb142d Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11868) 
   * 5789214ded1d1654cdeb384aaad5afa4337ac3d8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11908) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752019267


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 09d3419bc7ef88969ad62f2d19a66e586610bdcf (Tue Dec 29 10:01:37 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-20783).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     }, {
       "hash" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "61ab76b08a6b4603d6f9dd554bddd742a8d6fde9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774",
       "triggerID" : "dc919c934588343a3a4cbfb55e75f0f7fe325a58",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b87220ccdee330380bc015427b9f4b445173e324 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11721) 
   * 61ab76b08a6b4603d6f9dd554bddd742a8d6fde9 UNKNOWN
   * dc919c934588343a3a4cbfb55e75f0f7fe325a58 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11774) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 09d3419bc7ef88969ad62f2d19a66e586610bdcf Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14516: [FLINK-20783][table-planner-blink] Separate implementation of ExecNode and PhysicalNode in batch join

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14516:
URL: https://github.com/apache/flink/pull/14516#issuecomment-752024061


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11442",
       "triggerID" : "09d3419bc7ef88969ad62f2d19a66e586610bdcf",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718",
       "triggerID" : "5601ca0f26b9fdb130b0a030e7326d8b1287c76c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b87220ccdee330380bc015427b9f4b445173e324",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b87220ccdee330380bc015427b9f4b445173e324",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5601ca0f26b9fdb130b0a030e7326d8b1287c76c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=11718) 
   * b87220ccdee330380bc015427b9f4b445173e324 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org