You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Eugene Chung (Jira)" <ji...@apache.org> on 2021/05/13 07:12:00 UTC

[jira] [Updated] (HIVE-24948) Enhancing performance of OrcInputFormat.getSplits with bucket pruning

     [ https://issues.apache.org/jira/browse/HIVE-24948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Eugene Chung updated HIVE-24948:
--------------------------------
    Description: 
The summarized flow of generating input splits at Tez AM is like below; (by calling HiveSplitGenerator.initialize())
 # Perform dynamic partition pruning
 # Get the list of InputSplit by calling InputFormat.getSplits()
 [https://github.com/apache/hive/blob/624f62aadc08577cafaa299cfcf17c71fa6cdb3a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java#L260-L260]
 # Perform bucket pruning with the list above if it's possible
 [https://github.com/apache/hive/blob/624f62aadc08577cafaa299cfcf17c71fa6cdb3a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java#L299-L301]

But I observed that the action 2, getting the list of InputSplit, can make big overhead when the inputs are ORC files in HDFS. 

For example, there is a ORC table T partitioned by 'log_date' and each partition is bucketed by a column 'q'. There are 240 buckets in each partition and the size of each bucket(ORC file) is, let's say, 100MB.

The SQL is like this.  
{noformat}
set hive.tez.bucket.pruning=true;
select q, count(*) from T
where log_date between '2020-01-01' and '2020-06-30'
    and q = 'foobar'
group by q;{noformat}
It means there are 240 * 183(days) = 43680 ORC files in the input paths, but thanks to bucket pruning, only 183 files should be processed.

In my company's environment, the whole processing time of the SQL was roughly 5 minutes. However, I've checked that it took more than 3 minutes to make the list of OrcSplit for 43680 ORC files. The logs with tez.am.log.level=DEBUG showed like below;
{noformat}
2021-03-25 01:21:31,850 [DEBUG] [InputInitializer {Map 1} #0] |orc.OrcInputFormat|: getSplits started
...
2021-03-25 01:24:51,435 [DEBUG] [InputInitializer {Map 1} #0] |orc.OrcInputFormat|: getSplits finished
2021-03-25 01:24:51,444 [INFO] [InputInitializer {Map 1} #0] |io.HiveInputFormat|: number of splits 43680
2021-03-25 01:24:51,444 [DEBUG] [InputInitializer {Map 1} #0] |log.PerfLogger|: &lt;/PERFLOG method=getSplits start=1616602891776 end=1616602891776 duration=199668 from=org.apache.hadoop.hive.ql.io.HiveInputFormat&gt;
...
2021-03-25 01:26:03,385 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: DAG completed, dagId=dag_1615862187190_731117_1, dagState=SUCCEEDED {noformat}
43680 - 183 = 43497 InputSplits which consume about 60% of entire processing time are just simply discarded by the action 3, pruneBuckets().

 

With bucket pruning, I think making the whole list of ORC InputSplit is not necessary.

Therefore, I suggest that the flow would be like this;
 # Perform dynamic partition pruning
 # Get the list of InputSplit by calling InputFormat.getSplits()
 ## OrcInputFormat.getSplits() returns the bucket-pruned list if BitSet from FixedBucketPruningOptimizer exists

  was:
The summarized flow of generating input splits at Tez AM is like below; (by calling HiveSplitGenerator.initialize())
 # Perform dynamic partition pruning
 # Get the list of InputSplit by calling InputFormat.getSplits()
 [https://github.com/apache/hive/blob/624f62aadc08577cafaa299cfcf17c71fa6cdb3a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java#L260-L260]
 # Perform bucket pruning with the list above if it's possible
 [https://github.com/apache/hive/blob/624f62aadc08577cafaa299cfcf17c71fa6cdb3a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java#L299-L301]

But I observed that the action 2, getting the list of InputSplit, can make big overhead when the inputs are ORC files in HDFS.

 

For example, there is a ORC table T partitioned by 'log_date' and each partition is bucketed by a column 'q'. There are 240 buckets in each partition and the size of each bucket(ORC file) is, let's say, 100MB.

The SQL is like this.  
{noformat}
set hive.tez.bucket.pruning=true;
select q, count(*) from T
where log_date between '2020-01-01' and '2020-06-30'
    and q = 'foobar'
group by q;{noformat}
It means there are 240 * 183(days) = 43680 ORC files in the input path, but thanks to bucket pruning, only 183 files should be processed.

In my company's environment, the whole processing time of the SQL was roughly 5 minutes. However, I've checked that it took more than 3 minutes to make the list of OrcSplit for 43680 ORC files. The logs with tez.am.log.level=DEBUG showed like below;
{noformat}
2021-03-25 01:21:31,850 [DEBUG] [InputInitializer {Map 1} #0] |orc.OrcInputFormat|: getSplits started
...
2021-03-25 01:24:51,435 [DEBUG] [InputInitializer {Map 1} #0] |orc.OrcInputFormat|: getSplits finished
2021-03-25 01:24:51,444 [INFO] [InputInitializer {Map 1} #0] |io.HiveInputFormat|: number of splits 43680
2021-03-25 01:24:51,444 [DEBUG] [InputInitializer {Map 1} #0] |log.PerfLogger|: &lt;/PERFLOG method=getSplits start=1616602891776 end=1616602891776 duration=199668 from=org.apache.hadoop.hive.ql.io.HiveInputFormat&gt;
...
2021-03-25 01:26:03,385 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: DAG completed, dagId=dag_1615862187190_731117_1, dagState=SUCCEEDED {noformat}
43680 - 183 = 43497 InputSplits which consume about 60% of entire processing time are just simply discarded by the action 3, pruneBuckets().

 

With bucket pruning, I think making the whole list of ORC InputSplit is not necessary.

Therefore, I suggest that the flow would be like this;
 # Perform dynamic partition pruning
 # Get the list of InputSplit by calling InputFormat.getSplits()
 ## OrcInputFormat.getSplits() returns the bucket-pruned list if BitSet from FixedBucketPruningOptimizer exists


> Enhancing performance of OrcInputFormat.getSplits with bucket pruning
> ---------------------------------------------------------------------
>
>                 Key: HIVE-24948
>                 URL: https://issues.apache.org/jira/browse/HIVE-24948
>             Project: Hive
>          Issue Type: Bug
>          Components: ORC, Query Processor, Tez
>            Reporter: Eugene Chung
>            Assignee: Eugene Chung
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.0.0
>
>         Attachments: HIVE-24948_3.1.2.patch
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The summarized flow of generating input splits at Tez AM is like below; (by calling HiveSplitGenerator.initialize())
>  # Perform dynamic partition pruning
>  # Get the list of InputSplit by calling InputFormat.getSplits()
>  [https://github.com/apache/hive/blob/624f62aadc08577cafaa299cfcf17c71fa6cdb3a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java#L260-L260]
>  # Perform bucket pruning with the list above if it's possible
>  [https://github.com/apache/hive/blob/624f62aadc08577cafaa299cfcf17c71fa6cdb3a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java#L299-L301]
> But I observed that the action 2, getting the list of InputSplit, can make big overhead when the inputs are ORC files in HDFS. 
> For example, there is a ORC table T partitioned by 'log_date' and each partition is bucketed by a column 'q'. There are 240 buckets in each partition and the size of each bucket(ORC file) is, let's say, 100MB.
> The SQL is like this.  
> {noformat}
> set hive.tez.bucket.pruning=true;
> select q, count(*) from T
> where log_date between '2020-01-01' and '2020-06-30'
>     and q = 'foobar'
> group by q;{noformat}
> It means there are 240 * 183(days) = 43680 ORC files in the input paths, but thanks to bucket pruning, only 183 files should be processed.
> In my company's environment, the whole processing time of the SQL was roughly 5 minutes. However, I've checked that it took more than 3 minutes to make the list of OrcSplit for 43680 ORC files. The logs with tez.am.log.level=DEBUG showed like below;
> {noformat}
> 2021-03-25 01:21:31,850 [DEBUG] [InputInitializer {Map 1} #0] |orc.OrcInputFormat|: getSplits started
> ...
> 2021-03-25 01:24:51,435 [DEBUG] [InputInitializer {Map 1} #0] |orc.OrcInputFormat|: getSplits finished
> 2021-03-25 01:24:51,444 [INFO] [InputInitializer {Map 1} #0] |io.HiveInputFormat|: number of splits 43680
> 2021-03-25 01:24:51,444 [DEBUG] [InputInitializer {Map 1} #0] |log.PerfLogger|: &lt;/PERFLOG method=getSplits start=1616602891776 end=1616602891776 duration=199668 from=org.apache.hadoop.hive.ql.io.HiveInputFormat&gt;
> ...
> 2021-03-25 01:26:03,385 [INFO] [Dispatcher thread {Central}] |app.DAGAppMaster|: DAG completed, dagId=dag_1615862187190_731117_1, dagState=SUCCEEDED {noformat}
> 43680 - 183 = 43497 InputSplits which consume about 60% of entire processing time are just simply discarded by the action 3, pruneBuckets().
>  
> With bucket pruning, I think making the whole list of ORC InputSplit is not necessary.
> Therefore, I suggest that the flow would be like this;
>  # Perform dynamic partition pruning
>  # Get the list of InputSplit by calling InputFormat.getSplits()
>  ## OrcInputFormat.getSplits() returns the bucket-pruned list if BitSet from FixedBucketPruningOptimizer exists



--
This message was sent by Atlassian Jira
(v8.3.4#803005)