You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2017/06/07 04:59:18 UTC

[jira] [Comment Edited] (HIVE-16840) Investigate the performance of order by limit in HoS

    [ https://issues.apache.org/jira/browse/HIVE-16840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16040150#comment-16040150 ] 

liyunzhang_intel edited comment on HIVE-16840 at 6/7/17 4:59 AM:
-----------------------------------------------------------------

[~xuefuz],[~lirui],[~Ferd], [~csun]:
 Here provide 2 solutions to solve it
1. add an extra reduce to save the result of order and a new job to finish select * from (tmp result of order) limit N.
2. create SortByLimitShuffler like [SortByShuffle|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java], and implement order by limit N like following 
{code}
   val composite1 = sc.parallelize(1 to 200, 10).map(p=>(1-p,p)).sortByKey().zipWithIndex().filter{case (_, idx) => idx < 5}

{code}
sortByKey+zipWithIndex+filter to implement orderByLimit. if we use this way, we may need remove limit operator from reduce tree.
Comparing option1 and option2:
the disadvantage of optioin1 is we create an extra job to load the result of sort to do limit . But the sort is executed in parallel.
the disadvantage of option2 is {{zipWithIndex}} [triggers|https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/rdd/RDD.html#zipWithIndex()] spark job to do when RDD contains more than one partitions. This will cause time consuming.

Appreciate to get some suggestions from you. If you have any idea about it, please tell me.


was (Author: kellyzly):
[~xuefuz],[~lirui],[~Ferd], [~csun]:
 Here provide 2 solutions to solve it
1. add an extra reduce to save the result of order and a new job to finish select * from (tmp result of order) limit N.
2. create SortByLimitShuffler and implement order by limit N like following 
{code}
   val composite1 = sc.parallelize(1 to 200, 10).map(p=>(1-p,p)).sortByKey().zipWithIndex().filter{case (_, idx) => idx < 5}

{code}
sortByKey+zipWithIndex+filter to implement orderByLimit. if we use this way, we may need remove limit operator from reduce tree.


Appreciate to get some suggestions from you.

> Investigate the performance of order by limit in HoS
> ----------------------------------------------------
>
>                 Key: HIVE-16840
>                 URL: https://issues.apache.org/jira/browse/HIVE-16840
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>
> We found that on 1TB data of TPC-DS, q17 of TPC-DS hanged.
> {code}
>  select  i_item_id
>        ,i_item_desc
>        ,s_state
>        ,count(ss_quantity) as store_sales_quantitycount
>        ,avg(ss_quantity) as store_sales_quantityave
>        ,stddev_samp(ss_quantity) as store_sales_quantitystdev
>        ,stddev_samp(ss_quantity)/avg(ss_quantity) as store_sales_quantitycov
>        ,count(sr_return_quantity) as_store_returns_quantitycount
>        ,avg(sr_return_quantity) as_store_returns_quantityave
>        ,stddev_samp(sr_return_quantity) as_store_returns_quantitystdev
>        ,stddev_samp(sr_return_quantity)/avg(sr_return_quantity) as store_returns_quantitycov
>        ,count(cs_quantity) as catalog_sales_quantitycount ,avg(cs_quantity) as catalog_sales_quantityave
>        ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitystdev
>        ,stddev_samp(cs_quantity)/avg(cs_quantity) as catalog_sales_quantitycov
>  from store_sales
>      ,store_returns
>      ,catalog_sales
>      ,date_dim d1
>      ,date_dim d2
>      ,date_dim d3
>      ,store
>      ,item
>  where d1.d_quarter_name = '2000Q1'
>    and d1.d_date_sk = store_sales.ss_sold_date_sk
>    and item.i_item_sk = store_sales.ss_item_sk
>    and store.s_store_sk = store_sales.ss_store_sk
>    and store_sales.ss_customer_sk = store_returns.sr_customer_sk
>    and store_sales.ss_item_sk = store_returns.sr_item_sk
>    and store_sales.ss_ticket_number = store_returns.sr_ticket_number
>    and store_returns.sr_returned_date_sk = d2.d_date_sk
>    and d2.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>    and store_returns.sr_customer_sk = catalog_sales.cs_bill_customer_sk
>    and store_returns.sr_item_sk = catalog_sales.cs_item_sk
>    and catalog_sales.cs_sold_date_sk = d3.d_date_sk
>    and d3.d_quarter_name in ('2000Q1','2000Q2','2000Q3')
>  group by i_item_id
>          ,i_item_desc
>          ,s_state
>  order by i_item_id
>          ,i_item_desc
>          ,s_state
> limit 100;
> {code}
> the reason why the script hanged is because we only use 1 task to implement sort.
> {code}
> STAGE PLANS:
>   Stage: Stage-1
>     Spark
>       Edges:
>         Reducer 10 <- Reducer 9 (SORT, 1)
>         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 889), Map 11 (PARTITION-LEVEL SORT, 889)
>         Reducer 3 <- Map 12 (PARTITION-LEVEL SORT, 1009), Reducer 2 (PARTITION-LEVEL SORT, 1009)
>         Reducer 4 <- Map 13 (PARTITION-LEVEL SORT, 683), Reducer 3 (PARTITION-LEVEL SORT, 683)
>         Reducer 5 <- Map 14 (PARTITION-LEVEL SORT, 751), Reducer 4 (PARTITION-LEVEL SORT, 751)
>         Reducer 6 <- Map 15 (PARTITION-LEVEL SORT, 826), Reducer 5 (PARTITION-LEVEL SORT, 826)
>         Reducer 7 <- Map 16 (PARTITION-LEVEL SORT, 909), Reducer 6 (PARTITION-LEVEL SORT, 909)
>         Reducer 8 <- Map 17 (PARTITION-LEVEL SORT, 1001), Reducer 7 (PARTITION-LEVEL SORT, 1001)
>         Reducer 9 <- Reducer 8 (GROUP, 2)
> {code}
> The parallelism of Reducer 9 is 1. It is a orderby limit case so we use 1 task to execute to ensure the correctness. But the performance is poor.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)