You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/26 09:10:30 UTC

[GitHub] [flink] lsyldliu opened a new pull request, #20365: [FLINK-26929][table-runtime] [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

lsyldliu opened a new pull request, #20365:
URL: https://github.com/apache/flink/pull/20365

   ## What is the purpose of the change
   
   Introducing adaptive hash join for batch sql hash join.  Adaptive hash join integrates the advantages of hash join and sorted-merge join according to the characteristics of runtime data. The adaptive hash join will try to use hash join strategy firstly, if it failed, fall back to sort merge join. It will improve the stability  of hash join.
   
   In order to solve the problem of data skew, or too much data in the hash table, so here introduce two strategies fallback to sort merge join:
   
   1. If the data spill to disk exceeds the threshold {@code TABLE_EXEC_HASH_JOIN_SPILL_THRESHOLD} in the process of build hash table.
   2. If some partitions are spilled to disk more than three times in the process of hash join.
   
   
   ## Brief change log
   
     - *If the data spill to disk exceeds the threshold {@code TABLE_EXEC_HASH_JOIN_SPILL_THRESHOLD} in the process of build hash table, fallback to sort merge join*
     - *If some partitions are spilled to disk more than three times in the process of hash join, fallback to sort merge join*
   
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   
     - *Added uint tests in `LongHashTableTest` and `BinaryHashTableTest`*
     - *Added uint tests in `Int2AdaptiveHashJoinOperatorTest`*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20365: [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r939466103


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -214,16 +276,121 @@ void collect(RowData row1, RowData row2) throws Exception {
     @Override
     public void close() throws Exception {
         super.close();
+        closeHashTable();
+        condition.close();
+
+        // If fallback to sort merge join during hash join, also need to close the operator
+        if (fallbackSMJInBuild || fallbackSMJInProbe) {
+            sortMergeJoinFunction.close();
+        }
+    }
+
+    private void closeHashTable() {
         if (this.table != null) {
             this.table.close();
             this.table.free();
             this.table = null;
         }
-        condition.close();
+    }
+
+    /**
+     * In the process of building a hash table, if the data written to disk exceeds the threshold,
+     * it means that the build side is larger or there may be a more serious data skew, so fallback
+     * to sort merge join algorithm to deal with it in advance.
+     */
+    private void fallbackSMJProcessPartitionBuildSide(RowData rowData) throws Exception {
+        // spill all the in-memory partitions to disk firstly for return the memory which used to
+        // sort
+        this.table.spillAllInMemoryPartition();

Review Comment:
   I have revert first adaptive hash join strategy due to the concern of performance regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20365: [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r939466103


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -214,16 +276,121 @@ void collect(RowData row1, RowData row2) throws Exception {
     @Override
     public void close() throws Exception {
         super.close();
+        closeHashTable();
+        condition.close();
+
+        // If fallback to sort merge join during hash join, also need to close the operator
+        if (fallbackSMJInBuild || fallbackSMJInProbe) {
+            sortMergeJoinFunction.close();
+        }
+    }
+
+    private void closeHashTable() {
         if (this.table != null) {
             this.table.close();
             this.table.free();
             this.table = null;
         }
-        condition.close();
+    }
+
+    /**
+     * In the process of building a hash table, if the data written to disk exceeds the threshold,
+     * it means that the build side is larger or there may be a more serious data skew, so fallback
+     * to sort merge join algorithm to deal with it in advance.
+     */
+    private void fallbackSMJProcessPartitionBuildSide(RowData rowData) throws Exception {
+        // spill all the in-memory partitions to disk firstly for return the memory which used to
+        // sort
+        this.table.spillAllInMemoryPartition();

Review Comment:
   I have revert first adaptive hash join strategy due to the concern of performance regression. We will do this work in 1.17.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20365: [FLINK-26929][table-runtime] [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20365:
URL: https://github.com/apache/flink/pull/20365#issuecomment-1195236972

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "00ffaa6e76cefd1e5a6225fbd74f8da16a3e2ca0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "00ffaa6e76cefd1e5a6225fbd74f8da16a3e2ca0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 00ffaa6e76cefd1e5a6225fbd74f8da16a3e2ca0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] godfreyhe commented on a diff in pull request #20365: [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r935060142


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -220,6 +221,15 @@ public class ExecutionConfigOptions {
                                     + "The larger the memory, the higher the compression ratio, "
                                     + "but more memory resource will be consumed by the job.");
 
+    @Experimental
+    public static final ConfigOption<Long> TABLE_EXEC_HASH_JOIN_SPILL_THRESHOLD =

Review Comment:
   if this config option is `Experimental`, I think we can move this into `InternalConfigOptions`.  and add annotation `Documentation.Section`.
   
   Whether this configuration is an absolute or relative value ?  `table.exec.resource.hash-join.memory` is just a memory weight(relative value),absolute value is impractical here
   



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/SortMergeJoinOperator.java:
##########
@@ -62,488 +36,44 @@
 public class SortMergeJoinOperator extends TableStreamOperator<RowData>
         implements TwoInputStreamOperator<RowData, RowData, RowData>, BoundedMultiInput {
 
-    private final double externalBufferMemRatio;
-    private final FlinkJoinType type;
-    private final boolean leftIsSmaller;
-    private final boolean[] filterNulls;
-
-    // generated code to cook
-    private GeneratedJoinCondition condFuncCode;
-    private GeneratedProjection projectionCode1;
-    private GeneratedProjection projectionCode2;
-    private GeneratedNormalizedKeyComputer computer1;
-    private GeneratedRecordComparator comparator1;
-    private GeneratedNormalizedKeyComputer computer2;
-    private GeneratedRecordComparator comparator2;
-    private GeneratedRecordComparator genKeyComparator;
+    private final SortMergeJoinFunction sortMergeJoinFunction;

Review Comment:
   It's better we can create an independent commit to do the refactor (no new feature), and apply the changes in new commit. This could make review more happy



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -189,6 +191,44 @@ protected Transformation<RowData> translateToPlanInternal(
                         joinType.isRightOuter(),
                         joinType == FlinkJoinType.SEMI,
                         joinType == FlinkJoinType.ANTI);
+
+        long hashJoinManagedMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes();
+
+        long externalBufferMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
+                        .getBytes();
+        long sortMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }
+        long sortMergeJoinManagedMemory = externalBufferMemory * externalBufferNum + sortMemory * 2;
+
+        // Due to hash join maybe fallback to sort merge join, so here managed memory choose the
+        // large one
+        long managedMemory =

Review Comment:
   nit: Math.max()



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -144,15 +165,34 @@ public void open() throws Exception {
 
     @Override
     public void processElement1(StreamRecord<RowData> element) throws Exception {
-        checkState(!buildEnd, "Should not build ended.");
-        this.table.putBuildRow(element.getValue());
+        // If the data size spilled to disk more than spilledDataThresholdInBytes during build hash
+        // table, fallback to sort merge join early
+        if (!fallbackSMJInBuild) {

Review Comment:
   It's better we avoid checking the fallback operation for each record, which will cause performance degradation. We can check it for partition level when a partition need to spill to disk



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -214,16 +276,121 @@ void collect(RowData row1, RowData row2) throws Exception {
     @Override
     public void close() throws Exception {
         super.close();
+        closeHashTable();
+        condition.close();
+
+        // If fallback to sort merge join during hash join, also need to close the operator
+        if (fallbackSMJInBuild || fallbackSMJInProbe) {
+            sortMergeJoinFunction.close();
+        }
+    }
+
+    private void closeHashTable() {
         if (this.table != null) {
             this.table.close();
             this.table.free();
             this.table = null;
         }
-        condition.close();
+    }
+
+    /**
+     * In the process of building a hash table, if the data written to disk exceeds the threshold,
+     * it means that the build side is larger or there may be a more serious data skew, so fallback
+     * to sort merge join algorithm to deal with it in advance.
+     */
+    private void fallbackSMJProcessPartitionBuildSide(RowData rowData) throws Exception {
+        // spill all the in-memory partitions to disk firstly for return the memory which used to
+        // sort
+        this.table.spillAllInMemoryPartition();
+        LOG.info(
+                "Spill all in memory partitions to disk successfully, fallback to sort merge join.");
+        // initialize sort merge join function
+        initialSortMergeJoinFunction();
+        fallbackSMJInBuild = true;
+
+        // read build side data of all spilled partitions
+        for (BinaryHashPartition p : table.getPartitionsPendingForSMJ()) {
+            // process build side only
+            RowIterator<BinaryRowData> buildSideIter = table.getSpilledPartitionBuildSideIter(p);
+            while (buildSideIter.advanceNext()) {
+                processSortMergeJoinElement1(buildSideIter.getRow());
+            }
+        }
+
+        // close the HashTable
+        closeHashTable();
+
+        // process current record lastly
+        processSortMergeJoinElement1(rowData);
+    }
+
+    /**
+     * If here also exists partitions which spilled to disk more than three time when hash join end,
+     * means that the key in these partitions is very skewed, so fallback to sort merge join
+     * algorithm to process it.
+     */
+    private void fallbackSMJProcessPartition() throws Exception {
+        if (!table.getPartitionsPendingForSMJ().isEmpty()) {
+            // initialize sort merge join operator
+            LOG.info("Fallback to sort merge join.");
+            initialSortMergeJoinFunction();
+            fallbackSMJInProbe = true;
+
+            for (BinaryHashPartition p : table.getPartitionsPendingForSMJ()) {
+                // process build side
+                RowIterator<BinaryRowData> buildSideIter =
+                        table.getSpilledPartitionBuildSideIter(p);
+                while (buildSideIter.advanceNext()) {
+                    processSortMergeJoinElement1(buildSideIter.getRow());
+                }
+
+                // process probe side
+                ProbeIterator probeIter = table.getSpilledPartitionProbeSideIter(p);
+                BinaryRowData probeNext;
+                while ((probeNext = probeIter.next()) != null) {
+                    processSortMergeJoinElement2(probeNext);
+                }
+            }
+
+            // close the HashTable
+            closeHashTable();
+
+            // finish build and probe
+            sortMergeJoinFunction.endInput(1);
+            sortMergeJoinFunction.endInput(2);
+            LOG.info("Finish sort merge join.");

Review Comment:
   The log leads to  misunderstandings, just the hash partitions fallback is finished here.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -189,6 +191,44 @@ protected Transformation<RowData> translateToPlanInternal(
                         joinType.isRightOuter(),
                         joinType == FlinkJoinType.SEMI,
                         joinType == FlinkJoinType.ANTI);
+
+        long hashJoinManagedMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes();
+
+        long externalBufferMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
+                        .getBytes();
+        long sortMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }

Review Comment:
   please extract them into a util class. If we use the max value of HJ memory and SMJ memory, how users define the value of `table.exec.hash-join.spill-threshold` ?
   
   



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/HashJoinOperator.java:
##########
@@ -214,16 +276,121 @@ void collect(RowData row1, RowData row2) throws Exception {
     @Override
     public void close() throws Exception {
         super.close();
+        closeHashTable();
+        condition.close();
+
+        // If fallback to sort merge join during hash join, also need to close the operator
+        if (fallbackSMJInBuild || fallbackSMJInProbe) {
+            sortMergeJoinFunction.close();
+        }
+    }
+
+    private void closeHashTable() {
         if (this.table != null) {
             this.table.close();
             this.table.free();
             this.table = null;
         }
-        condition.close();
+    }
+
+    /**
+     * In the process of building a hash table, if the data written to disk exceeds the threshold,
+     * it means that the build side is larger or there may be a more serious data skew, so fallback
+     * to sort merge join algorithm to deal with it in advance.
+     */
+    private void fallbackSMJProcessPartitionBuildSide(RowData rowData) throws Exception {
+        // spill all the in-memory partitions to disk firstly for return the memory which used to
+        // sort
+        this.table.spillAllInMemoryPartition();

Review Comment:
   Whether it is possible to avoid writing to the disk for the memory data, just write them into sorter and then read the data in the disk into sorter ?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/JoinOperatorUtil.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
+import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinFunction;
+import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.stream.IntStream;
+
+/** Utility for {@link SortMergeJoinOperator}. */
+public class JoinOperatorUtil {

Review Comment:
   If this util is used for SMJ, why don't we rename it to SortMergeJoinOperatorUtil ?
   



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/JoinUtil.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import static org.apache.flink.table.runtime.operators.join.FlinkJoinType.FULL;
+import static org.apache.flink.table.runtime.operators.join.FlinkJoinType.INNER;
+import static org.apache.flink.table.runtime.operators.join.FlinkJoinType.RIGHT;
+
+/** Utility for join. */
+public class JoinUtil {
+
+    public static FlinkJoinType getJoinType(boolean leftOuter, boolean rightOuter) {
+        if (leftOuter && rightOuter) {

Review Comment:
   how to represent SEMI and ANTI ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on a diff in pull request #20365: [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r939465887


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/JoinUtil.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import static org.apache.flink.table.runtime.operators.join.FlinkJoinType.FULL;
+import static org.apache.flink.table.runtime.operators.join.FlinkJoinType.INNER;
+import static org.apache.flink.table.runtime.operators.join.FlinkJoinType.RIGHT;
+
+/** Utility for join. */
+public class JoinUtil {
+
+    public static FlinkJoinType getJoinType(boolean leftOuter, boolean rightOuter) {
+        if (leftOuter && rightOuter) {

Review Comment:
   This is specified in related test explicitly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] lsyldliu commented on pull request #20365: [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

Posted by GitBox <gi...@apache.org>.
lsyldliu commented on PR #20365:
URL: https://github.com/apache/flink/pull/20365#issuecomment-1207124329

   Thanks for all review, I've addressed comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong commented on a diff in pull request #20365: [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

Posted by GitBox <gi...@apache.org>.
wuchong commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r933165470


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -189,6 +191,44 @@ protected Transformation<RowData> translateToPlanInternal(
                         joinType.isRightOuter(),
                         joinType == FlinkJoinType.SEMI,
                         joinType == FlinkJoinType.ANTI);
+
+        long hashJoinManagedMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes();
+
+        long externalBufferMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
+                        .getBytes();
+        long sortMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }

Review Comment:
   Could you add comments to explain this?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java:
##########
@@ -399,10 +406,37 @@ private boolean prepareNextPartition() throws IOException {
         this.probeMatchedPhase = true;
         this.buildIterVisited = false;
 
+        final int nextRecursionLevel = p.getRecursionLevel() + 1;
+        if (nextRecursionLevel == 2) {
+            LOG.info("Recursive hash join: partition number is " + p.getPartitionNumber());
+        } else if (nextRecursionLevel > MAX_RECURSION_DEPTH) {
+            LOG.info(
+                    String.format(
+                            "Partition number [%s] recursive level more than %s.",
+                            p.getPartitionNumber(), MAX_RECURSION_DEPTH));

Review Comment:
   1. Use slf4j `{}` instead of `String.format`. 
   2. Add "Process the partition using SortMergeJoin later" in the log?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -189,6 +191,44 @@ protected Transformation<RowData> translateToPlanInternal(
                         joinType.isRightOuter(),
                         joinType == FlinkJoinType.SEMI,
                         joinType == FlinkJoinType.ANTI);
+
+        long hashJoinManagedMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes();
+
+        long externalBufferMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
+                        .getBytes();
+        long sortMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }
+        long sortMergeJoinManagedMemory = externalBufferMemory * externalBufferNum + sortMemory * 2;

Review Comment:
   Could you explain a bit more about the math formula?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -220,6 +221,15 @@ public class ExecutionConfigOptions {
                                     + "The larger the memory, the higher the compression ratio, "
                                     + "but more memory resource will be consumed by the job.");
 
+    @Experimental
+    public static final ConfigOption<Long> TABLE_EXEC_HASH_JOIN_SPILL_THRESHOLD =
+            key("table.exec.hash-join.spill-threshold")
+                    .longType()
+                    .defaultValue(8 * 1024 * 1024 * 1024L)

Review Comment:
   Is there any reason to use this value?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -220,6 +221,15 @@ public class ExecutionConfigOptions {
                                     + "The larger the memory, the higher the compression ratio, "
                                     + "but more memory resource will be consumed by the job.");
 
+    @Experimental
+    public static final ConfigOption<Long> TABLE_EXEC_HASH_JOIN_SPILL_THRESHOLD =
+            key("table.exec.hash-join.spill-threshold")
+                    .longType()
+                    .defaultValue(8 * 1024 * 1024 * 1024L)

Review Comment:
   Please use `MemorySize` type which is easier to configure. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wuchong merged pull request #20365: [FLINK-26929][table-runtime] Introduce adaptive hash join strategy for batch hash join

Posted by GitBox <gi...@apache.org>.
wuchong merged PR #20365:
URL: https://github.com/apache/flink/pull/20365


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org