You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "liyunzhang_intel (JIRA)" <ji...@apache.org> on 2017/08/29 08:48:00 UTC

[jira] [Updated] (HIVE-17407) TPC-DS/query65 hangs on HoS in certain settings

     [ https://issues.apache.org/jira/browse/HIVE-17407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

liyunzhang_intel updated HIVE-17407:
------------------------------------
    Description: 
[TPC-DS/query65.sql|https://github.com/kellyzly/hive-testbench/blob/hive14/sample-queries-tpcds/query65.sql] hangs when using following settings on 3TB scale.
{code}
set hive.auto.convert.join.noconditionaltask.size=3000000;
{code}
  the explain is attached in [explain65|https://issues.apache.org/jira/secure/attachment/12884210/explain.65]. The [screenshot| shows that it hanged in the Stage5.

Let's explain why hang.
{code}
       Reducer 10 <- Map 9 (GROUP, 1009)
        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 5 (PARTITION-LEVEL SORT, 1), Reducer 7 (PARTITION-LEVEL SORT, 1)
        Reducer 3 <- Reducer 10 (PARTITION-LEVEL SORT, 1009), Reducer 2 (PARTITION-LEVEL SORT, 1009)
        Reducer 4 <- Reducer 3 (SORT, 1)
        Reducer 7 <- Map 6 (GROUP PARTITION-LEVEL SORT, 1009)
{code}

The numPartitions of SparkEdgeProperty which connects Reducer 2 and Reducer 3 is 1. This is because 
org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils#createReduceWork
{code}
public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root,
    SparkWork sparkWork) throws SemanticException {
   
    for (Operator<? extends OperatorDesc> parentOfRoot : root.getParentOperators()) {
      Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator,
          "AssertionError: expected parentOfRoot to be an "
              + "instance of ReduceSinkOperator, but was "
              + parentOfRoot.getClass().getName());
      ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot;
      maxExecutors = Math.max(maxExecutors, reduceSink.getConf().getNumReducers());
    }
    reduceWork.setNumReduceTasks(maxExecutors);

{code}
here the numReducers of all parentOfRoot is 1( in the explain, the parallelism of Map 1, Map 5, Reducer 7 is 1), so the numPartitions of SparkEdgeProperty which connects Reducer 2 and Reducer 3 is 1. 
More explain why the parallelism of Map 1, Map 5,Reducer 7 are 1. The physical plan of the query is 
{code}
TS[0]-FIL[50]-RS[2]-JOIN[5]-FIL[49]-SEL[7]-GBY[8]-RS[9]-GBY[10]-SEL[11]-GBY[15]-SEL[16]-RS[33]-JOIN[34]-RS[36]-JOIN[39]-FIL[48]-SEL[41]-RS[42]-SEL[43]-LIM[44]-FS[45]
TS[1]-FIL[51]-RS[4]-JOIN[5]
TS[17]-FIL[53]-RS[19]-JOIN[22]-FIL[52]-SEL[24]-GBY[25]-RS[26]-GBY[27]-RS[38]-JOIN[39]
TS[18]-FIL[54]-RS[21]-JOIN[22]
TS[29]-FIL[55]-RS[31]-JOIN[34]
TS[30]-FIL[56]-RS[32]-JOIN[34]
{code}
The related RS of Map1, Map5, Reducer 7 is RS\[31\], RS\[32\], RS\[33\]. The parallelism is set by [SemanticAnalyzer#genJoinReduceSinkChild|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L8267]
It seems that there is no logical error in the code. But it is not reasonable to use 1 task to execute to deal with so big data(more than 30GB). Is there any way to pass the query in this situation( the reason why i set hive.auto.convert.join.noconditionaltask.size as 3000000, if the join is converted to the map join, it will throw disk error).

  was:
[TPC-DS/query65.sql|https://github.com/kellyzly/hive-testbench/blob/hive14/sample-queries-tpcds/query65.sql] hangs when using following settings on 3TB scale.
{code}
set hive.auto.convert.join.noconditionaltask.size=3000000;
{code}
  the explain is attached in explain65. The screenshot shows that it hanged in the Stage5.

Let's explain why hang.
{code}
       Reducer 10 <- Map 9 (GROUP, 1009)
        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 5 (PARTITION-LEVEL SORT, 1), Reducer 7 (PARTITION-LEVEL SORT, 1)
        Reducer 3 <- Reducer 10 (PARTITION-LEVEL SORT, 1009), Reducer 2 (PARTITION-LEVEL SORT, 1009)
        Reducer 4 <- Reducer 3 (SORT, 1)
        Reducer 7 <- Map 6 (GROUP PARTITION-LEVEL SORT, 1009)
{code}

The numPartitions of SparkEdgeProperty which connects Reducer 2 and Reducer 3 is 1. This is because 
org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils#createReduceWork
{code}
public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root,
    SparkWork sparkWork) throws SemanticException {
   
    for (Operator<? extends OperatorDesc> parentOfRoot : root.getParentOperators()) {
      Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator,
          "AssertionError: expected parentOfRoot to be an "
              + "instance of ReduceSinkOperator, but was "
              + parentOfRoot.getClass().getName());
      ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot;
      maxExecutors = Math.max(maxExecutors, reduceSink.getConf().getNumReducers());
    }
    reduceWork.setNumReduceTasks(maxExecutors);

{code}
here the numReducers of all parentOfRoot is 1( in the explain, the parallelism of Map 1, Map 5, Reducer 7 is 1), so the numPartitions of SparkEdgeProperty which connects Reducer 2 and Reducer 3 is 1. 
More explain why the parallelism of Map 1, Map 5,Reducer 7 are 1. The physical plan of the query is 
{code}
TS[0]-FIL[50]-RS[2]-JOIN[5]-FIL[49]-SEL[7]-GBY[8]-RS[9]-GBY[10]-SEL[11]-GBY[15]-SEL[16]-RS[33]-JOIN[34]-RS[36]-JOIN[39]-FIL[48]-SEL[41]-RS[42]-SEL[43]-LIM[44]-FS[45]
TS[1]-FIL[51]-RS[4]-JOIN[5]
TS[17]-FIL[53]-RS[19]-JOIN[22]-FIL[52]-SEL[24]-GBY[25]-RS[26]-GBY[27]-RS[38]-JOIN[39]
TS[18]-FIL[54]-RS[21]-JOIN[22]
TS[29]-FIL[55]-RS[31]-JOIN[34]
TS[30]-FIL[56]-RS[32]-JOIN[34]
{code}
The related RS of Map1, Map5, Reducer 7 is RS\[31\], RS\[32\], RS\[33\]. The parallelism is set by [SemanticAnalyzer#genJoinReduceSinkChild|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L8267]
It seems that there is no logical error in the code. But it is not reasonable to use 1 task to execute to deal with so big data(more than 30GB). Is there any way to pass the query in this situation( the reason why i set hive.auto.convert.join.noconditionaltask.size as 3000000, if the join is converted to the map join, it will throw disk error).


> TPC-DS/query65 hangs on HoS in certain settings
> -----------------------------------------------
>
>                 Key: HIVE-17407
>                 URL: https://issues.apache.org/jira/browse/HIVE-17407
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang_intel
>         Attachments: explain.65, hang.PNG
>
>
> [TPC-DS/query65.sql|https://github.com/kellyzly/hive-testbench/blob/hive14/sample-queries-tpcds/query65.sql] hangs when using following settings on 3TB scale.
> {code}
> set hive.auto.convert.join.noconditionaltask.size=3000000;
> {code}
>   the explain is attached in [explain65|https://issues.apache.org/jira/secure/attachment/12884210/explain.65]. The [screenshot| shows that it hanged in the Stage5.
> Let's explain why hang.
> {code}
>        Reducer 10 <- Map 9 (GROUP, 1009)
>         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 5 (PARTITION-LEVEL SORT, 1), Reducer 7 (PARTITION-LEVEL SORT, 1)
>         Reducer 3 <- Reducer 10 (PARTITION-LEVEL SORT, 1009), Reducer 2 (PARTITION-LEVEL SORT, 1009)
>         Reducer 4 <- Reducer 3 (SORT, 1)
>         Reducer 7 <- Map 6 (GROUP PARTITION-LEVEL SORT, 1009)
> {code}
> The numPartitions of SparkEdgeProperty which connects Reducer 2 and Reducer 3 is 1. This is because 
> org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils#createReduceWork
> {code}
> public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root,
>     SparkWork sparkWork) throws SemanticException {
>    
>     for (Operator<? extends OperatorDesc> parentOfRoot : root.getParentOperators()) {
>       Preconditions.checkArgument(parentOfRoot instanceof ReduceSinkOperator,
>           "AssertionError: expected parentOfRoot to be an "
>               + "instance of ReduceSinkOperator, but was "
>               + parentOfRoot.getClass().getName());
>       ReduceSinkOperator reduceSink = (ReduceSinkOperator) parentOfRoot;
>       maxExecutors = Math.max(maxExecutors, reduceSink.getConf().getNumReducers());
>     }
>     reduceWork.setNumReduceTasks(maxExecutors);
> {code}
> here the numReducers of all parentOfRoot is 1( in the explain, the parallelism of Map 1, Map 5, Reducer 7 is 1), so the numPartitions of SparkEdgeProperty which connects Reducer 2 and Reducer 3 is 1. 
> More explain why the parallelism of Map 1, Map 5,Reducer 7 are 1. The physical plan of the query is 
> {code}
> TS[0]-FIL[50]-RS[2]-JOIN[5]-FIL[49]-SEL[7]-GBY[8]-RS[9]-GBY[10]-SEL[11]-GBY[15]-SEL[16]-RS[33]-JOIN[34]-RS[36]-JOIN[39]-FIL[48]-SEL[41]-RS[42]-SEL[43]-LIM[44]-FS[45]
> TS[1]-FIL[51]-RS[4]-JOIN[5]
> TS[17]-FIL[53]-RS[19]-JOIN[22]-FIL[52]-SEL[24]-GBY[25]-RS[26]-GBY[27]-RS[38]-JOIN[39]
> TS[18]-FIL[54]-RS[21]-JOIN[22]
> TS[29]-FIL[55]-RS[31]-JOIN[34]
> TS[30]-FIL[56]-RS[32]-JOIN[34]
> {code}
> The related RS of Map1, Map5, Reducer 7 is RS\[31\], RS\[32\], RS\[33\]. The parallelism is set by [SemanticAnalyzer#genJoinReduceSinkChild|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java#L8267]
> It seems that there is no logical error in the code. But it is not reasonable to use 1 task to execute to deal with so big data(more than 30GB). Is there any way to pass the query in this situation( the reason why i set hive.auto.convert.join.noconditionaltask.size as 3000000, if the join is converted to the map join, it will throw disk error).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)