You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/29 12:37:46 UTC

[GitHub] [flink] wuchong commented on a diff in pull request #20365: [FLINK-26929][table-runtime] Introduce adaptive hash join for batch hash join

wuchong commented on code in PR #20365:
URL: https://github.com/apache/flink/pull/20365#discussion_r933165470


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -189,6 +191,44 @@ protected Transformation<RowData> translateToPlanInternal(
                         joinType.isRightOuter(),
                         joinType == FlinkJoinType.SEMI,
                         joinType == FlinkJoinType.ANTI);
+
+        long hashJoinManagedMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes();
+
+        long externalBufferMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
+                        .getBytes();
+        long sortMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }

Review Comment:
   Could you add comments to explain this?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashTable.java:
##########
@@ -399,10 +406,37 @@ private boolean prepareNextPartition() throws IOException {
         this.probeMatchedPhase = true;
         this.buildIterVisited = false;
 
+        final int nextRecursionLevel = p.getRecursionLevel() + 1;
+        if (nextRecursionLevel == 2) {
+            LOG.info("Recursive hash join: partition number is " + p.getPartitionNumber());
+        } else if (nextRecursionLevel > MAX_RECURSION_DEPTH) {
+            LOG.info(
+                    String.format(
+                            "Partition number [%s] recursive level more than %s.",
+                            p.getPartitionNumber(), MAX_RECURSION_DEPTH));

Review Comment:
   1. Use slf4j `{}` instead of `String.format`. 
   2. Add "Process the partition using SortMergeJoin later" in the log?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashJoin.java:
##########
@@ -189,6 +191,44 @@ protected Transformation<RowData> translateToPlanInternal(
                         joinType.isRightOuter(),
                         joinType == FlinkJoinType.SEMI,
                         joinType == FlinkJoinType.ANTI);
+
+        long hashJoinManagedMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY).getBytes();
+
+        long externalBufferMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY)
+                        .getBytes();
+        long sortMemory =
+                config.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_SORT_MEMORY).getBytes();
+        int externalBufferNum = 1;
+        if (joinType == FlinkJoinType.FULL) {
+            externalBufferNum = 2;
+        }
+        long sortMergeJoinManagedMemory = externalBufferMemory * externalBufferNum + sortMemory * 2;

Review Comment:
   Could you explain a bit more about the math formula?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -220,6 +221,15 @@ public class ExecutionConfigOptions {
                                     + "The larger the memory, the higher the compression ratio, "
                                     + "but more memory resource will be consumed by the job.");
 
+    @Experimental
+    public static final ConfigOption<Long> TABLE_EXEC_HASH_JOIN_SPILL_THRESHOLD =
+            key("table.exec.hash-join.spill-threshold")
+                    .longType()
+                    .defaultValue(8 * 1024 * 1024 * 1024L)

Review Comment:
   Is there any reason to use this value?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java:
##########
@@ -220,6 +221,15 @@ public class ExecutionConfigOptions {
                                     + "The larger the memory, the higher the compression ratio, "
                                     + "but more memory resource will be consumed by the job.");
 
+    @Experimental
+    public static final ConfigOption<Long> TABLE_EXEC_HASH_JOIN_SPILL_THRESHOLD =
+            key("table.exec.hash-join.spill-threshold")
+                    .longType()
+                    .defaultValue(8 * 1024 * 1024 * 1024L)

Review Comment:
   Please use `MemorySize` type which is easier to configure. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org