You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2022/04/04 16:04:15 UTC

[GitHub] [hive] szlta opened a new pull request, #3174: HIVE-26110

szlta opened a new pull request, #3174:
URL: https://github.com/apache/hive/pull/3174

   Bulk insert into partitioned table creates lots of files in iceberg, because the SortedDynPartitionOptimizer doesn't set the key->reducer affinity that could be done by just marking the sort expressions as 'partition' columns.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] rbalamohan commented on a diff in pull request #3174: HIVE-26110

Posted by GitBox <gi...@apache.org>.
rbalamohan commented on code in PR #3174:
URL: https://github.com/apache/hive/pull/3174#discussion_r842299555


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -648,7 +648,12 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List
       ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
 
       for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
-        keyCols.add(customSortExpr.apply(allCols));
+        ExprNodeDesc colExpr = customSortExpr.apply(allCols);
+        // Custom sort expressions are marked as KEYs, which is required for sorting the rows that are going for
+        // a particular reducer instance. They also need to be marked as 'partition' columns for MapReduce shuffle
+        // phase, in order to gather the same keys to the same reducer instances.
+        keyCols.add(colExpr);
+        partCols.add(colExpr);

Review Comment:
   In the case of iceberg, "getDPColNames", "getNumDPCols", etc would not be available in the context. There are some historical assumptions that partition names will be present in the end of the schema. When iceberg tables used, these assumptions are not valid. 
   
   It will be good to add "colExpr" to partCols when "partitionPositions && dpCtx.getDPColNames()" are empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] rbalamohan commented on a diff in pull request #3174: HIVE-26110

Posted by GitBox <gi...@apache.org>.
rbalamohan commented on code in PR #3174:
URL: https://github.com/apache/hive/pull/3174#discussion_r842526145


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -648,7 +648,12 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List
       ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
 
       for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
-        keyCols.add(customSortExpr.apply(allCols));
+        ExprNodeDesc colExpr = customSortExpr.apply(allCols);
+        // Custom sort expressions are marked as KEYs, which is required for sorting the rows that are going for
+        // a particular reducer instance. They also need to be marked as 'partition' columns for MapReduce shuffle
+        // phase, in order to gather the same keys to the same reducer instances.
+        keyCols.add(colExpr);
+        partCols.add(colExpr);

Review Comment:
   I didn't realise that entire "customSortExprs" copying was part of HIVE-25975. Should be fine in this case. 
   
   +1 pending tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] szlta commented on a diff in pull request #3174: HIVE-26110

Posted by GitBox <gi...@apache.org>.
szlta commented on code in PR #3174:
URL: https://github.com/apache/hive/pull/3174#discussion_r842544398


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -648,7 +648,12 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List
       ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
 
       for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
-        keyCols.add(customSortExpr.apply(allCols));
+        ExprNodeDesc colExpr = customSortExpr.apply(allCols);
+        // Custom sort expressions are marked as KEYs, which is required for sorting the rows that are going for
+        // a particular reducer instance. They also need to be marked as 'partition' columns for MapReduce shuffle
+        // phase, in order to gather the same keys to the same reducer instances.
+        keyCols.add(colExpr);
+        partCols.add(colExpr);

Review Comment:
   Thx!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] szlta merged pull request #3174: HIVE-26110

Posted by GitBox <gi...@apache.org>.
szlta merged PR #3174:
URL: https://github.com/apache/hive/pull/3174


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] szlta commented on a diff in pull request #3174: HIVE-26110

Posted by GitBox <gi...@apache.org>.
szlta commented on code in PR #3174:
URL: https://github.com/apache/hive/pull/3174#discussion_r842445223


##########
ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java:
##########
@@ -648,7 +648,12 @@ public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List
       ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
 
       for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
-        keyCols.add(customSortExpr.apply(allCols));
+        ExprNodeDesc colExpr = customSortExpr.apply(allCols);
+        // Custom sort expressions are marked as KEYs, which is required for sorting the rows that are going for
+        // a particular reducer instance. They also need to be marked as 'partition' columns for MapReduce shuffle
+        // phase, in order to gather the same keys to the same reducer instances.
+        keyCols.add(colExpr);
+        partCols.add(colExpr);

Review Comment:
   If customSortExprs are present, then we can be sure that partitionPositions are empty as per https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java#L592-L596
   As for dpColNames - I'm not sure why it would matter? With Iceberg tables the table schema already contains the partition columns too, it's just that Hive doesn't think of these as partition columns, rather just regular columns.
   I think the schema should be fine, all columns will serve as VALUE (with Iceberg we want to write out the partition values into the file too, as in some cases the spec can have a non-identity type of partition transform) plus the ones identified by customSortExpr will be added as KEY for sorting purposes (only) additionally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org