You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/04/05 09:02:08 UTC

[hive] branch master updated: HIVE-26110: Bulk insert into partitioned table creates lots of files in iceberg (#3174) (Adam Szita, reviewed by Marton Bod, Peter Vary and Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new be528a2050 HIVE-26110: Bulk insert into partitioned table creates lots of files in iceberg (#3174) (Adam Szita, reviewed by Marton Bod, Peter Vary and Rajesh Balamohan)
be528a2050 is described below

commit be528a2050b3164ac7dd779cff932c5c6449025b
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Tue Apr 5 11:01:56 2022 +0200

    HIVE-26110: Bulk insert into partitioned table creates lots of files in iceberg (#3174) (Adam Szita, reviewed by Marton Bod, Peter Vary and Rajesh Balamohan)
---
 .../src/test/results/positive/dynamic_partition_writes.q.out       | 4 ++++
 .../hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java      | 7 ++++++-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
index 91b3808faf..36fb39c091 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
@@ -75,6 +75,7 @@ Stage-3
                   Output:["_col0","_col1","_col1"]
                 <-Map 1 [SIMPLE_EDGE] vectorized
                   PARTITION_ONLY_SHUFFLE [RS_13]
+                    PartitionCols:_col1
                     Select Operator [SEL_12] (rows=20 width=91)
                       Output:["_col0","_col1"]
                       TableScan [TS_0] (rows=20 width=91)
@@ -166,6 +167,7 @@ Stage-3
                   Output:["_col0","_col1","iceberg_bucket(_col1, 2)"]
                 <-Map 1 [SIMPLE_EDGE] vectorized
                   PARTITION_ONLY_SHUFFLE [RS_13]
+                    PartitionCols:iceberg_bucket(_col1, 2)
                     Select Operator [SEL_12] (rows=20 width=91)
                       Output:["_col0","_col1"]
                       TableScan [TS_0] (rows=20 width=91)
@@ -257,6 +259,7 @@ Stage-3
                   Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"]
                 <-Map 1 [SIMPLE_EDGE] vectorized
                   PARTITION_ONLY_SHUFFLE [RS_13]
+                    PartitionCols:_col1, iceberg_bucket(_col2, 3)
                     Select Operator [SEL_12] (rows=20 width=99)
                       Output:["_col0","_col1","_col2"]
                       TableScan [TS_0] (rows=20 width=99)
@@ -387,6 +390,7 @@ Stage-3
                   Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"]
                 <-Map 1 [SIMPLE_EDGE] vectorized
                   PARTITION_ONLY_SHUFFLE [RS_16]
+                    PartitionCols:_col1, iceberg_bucket(_col2, 3)
                     Select Operator [SEL_15] (rows=4 width=99)
                       Output:["_col0","_col1","_col2"]
                       Filter Operator [FIL_14] (rows=4 width=99)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 03b4124a6f..2668f269b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -648,7 +648,12 @@ public class SortedDynPartitionOptimizer extends Transform {
       ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
 
       for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
-        keyCols.add(customSortExpr.apply(allCols));
+        ExprNodeDesc colExpr = customSortExpr.apply(allCols);
+        // Custom sort expressions are marked as KEYs, which is required for sorting the rows that are going for
+        // a particular reducer instance. They also need to be marked as 'partition' columns for MapReduce shuffle
+        // phase, in order to gather the same keys to the same reducer instances.
+        keyCols.add(colExpr);
+        partCols.add(colExpr);
       }
 
       // we will clone here as RS will update bucket column key with its