You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hive.apache.org by Navis Ryu <na...@nexr.com> on 2014/07/04 02:15:21 UTC

Review Request 23270: Wrong results when union all of grouping followed by group by with correlation optimization

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23270/
-----------------------------------------------------------

Review request for hive.


Bugs: HIVE-7205
    https://issues.apache.org/jira/browse/HIVE-7205


Repository: hive-git


Description
-------

use case :

table TBL (a string,b string) contains single row : 'a','a'
the following query :
{code:sql}
select b, sum(cc) from (
        select b,count(1) as cc from TBL group by b
        union all
        select a as b,count(1) as cc from TBL group by a
) z
group by b
{code}
returns 

a 1
a 1
while set hive.optimize.correlation=true;

if we change set hive.optimize.correlation=false;
it returns correct results : a 2


The plan with correlation optimization :
{code:sql}
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_UNION (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL a) b) (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL a))))) z)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL cc)))) (TOK_GROUPBY (TOK_TABLE_OR_COL b))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        null-subquery1:z-subquery1:TBL 
          TableScan
            alias: TBL
            Select Operator
              expressions:
                    expr: b
                    type: string
              outputColumnNames: b
              Group By Operator
                aggregations:
                      expr: count(1)
                bucketGroup: false
                keys:
                      expr: b
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1
                Reduce Output Operator
                  key expressions:
                        expr: _col0
                        type: string
                  sort order: +
                  Map-reduce partition columns:
                        expr: _col0
                        type: string
                  tag: 0
                  value expressions:
                        expr: _col1
                        type: bigint
        null-subquery2:z-subquery2:TBL 
          TableScan
            alias: TBL
            Select Operator
              expressions:
                    expr: a
                    type: string
              outputColumnNames: a
              Group By Operator
                aggregations:
                      expr: count(1)
                bucketGroup: false
                keys:
                      expr: a
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1
                Reduce Output Operator
                  key expressions:
                        expr: _col0
                        type: string
                  sort order: +
                  Map-reduce partition columns:
                        expr: _col0
                        type: string
                  tag: 1
                  value expressions:
                        expr: _col1
                        type: bigint
      Reduce Operator Tree:
        Demux Operator
          Group By Operator
            aggregations:
                  expr: count(VALUE._col0)
            bucketGroup: false
            keys:
                  expr: KEY._col0
                  type: string
            mode: mergepartial
            outputColumnNames: _col0, _col1
            Select Operator
              expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: bigint
              outputColumnNames: _col0, _col1
              Union
                Select Operator
                  expressions:
                        expr: _col0
                        type: string
                        expr: _col1
                        type: bigint
                  outputColumnNames: _col0, _col1
                  Mux Operator
                    Group By Operator
                      aggregations:
                            expr: sum(_col1)
                      bucketGroup: false
                      keys:
                            expr: _col0
                            type: string
                      mode: complete
                      outputColumnNames: _col0, _col1
                      Select Operator
                        expressions:
                              expr: _col0
                              type: string
                              expr: _col1
                              type: bigint
                        outputColumnNames: _col0, _col1
                        File Output Operator
                          compressed: false
                          GlobalTableId: 0
                          table:
                              input format: org.apache.hadoop.mapred.TextInputFormat
                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
          Group By Operator
            aggregations:
                  expr: count(VALUE._col0)
            bucketGroup: false
            keys:
                  expr: KEY._col0
                  type: string
            mode: mergepartial
            outputColumnNames: _col0, _col1
            Select Operator
              expressions:
                    expr: _col0
                    type: string
                    expr: _col1
                    type: bigint
              outputColumnNames: _col0, _col1
              Union
                Select Operator
                  expressions:
                        expr: _col0
                        type: string
                        expr: _col1
                        type: bigint
                  outputColumnNames: _col0, _col1
                  Mux Operator
                    Group By Operator
                      aggregations:
                            expr: sum(_col1)
                      bucketGroup: false
                      keys:
                            expr: _col0
                            type: string
                      mode: complete
                      outputColumnNames: _col0, _col1
                      Select Operator
                        expressions:
                              expr: _col0
                              type: string
                              expr: _col1
                              type: bigint
                        outputColumnNames: _col0, _col1
                        File Output Operator
                          compressed: false
                          GlobalTableId: 0
                          table:
                              input format: org.apache.hadoop.mapred.TextInputFormat
                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1
{code}

Plan without correlation optimization :

{code:sql}
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_UNION (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL a) b) (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL a))))) z)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL cc)))) (TOK_GROUPBY (TOK_TABLE_OR_COL b))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1, Stage-3
  Stage-3 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        null-subquery2:z-subquery2:TBL 
          TableScan
            alias: TBL
            Select Operator
              expressions:
                    expr: a
                    type: string
              outputColumnNames: a
              Group By Operator
                aggregations:
                      expr: count(1)
                bucketGroup: false
                keys:
                      expr: a
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1
                Reduce Output Operator
                  key expressions:
                        expr: _col0
                        type: string
                  sort order: +
                  Map-reduce partition columns:
                        expr: _col0
                        type: string
                  tag: -1
                  value expressions:
                        expr: _col1
                        type: bigint
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: count(VALUE._col0)
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: bigint
            outputColumnNames: _col0, _col1
            File Output Operator
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-2
    Map Reduce
      Alias -> Map Operator Tree:
        maprfs:/user/hadoop/tmp/hive/hive_2014-06-10_15-42-37_188_2403850033480056671-5/-mr-10002 
          TableScan
            Union
              Select Operator
                expressions:
                      expr: _col0
                      type: string
                      expr: _col1
                      type: bigint
                outputColumnNames: _col0, _col1
                Group By Operator
                  aggregations:
                        expr: sum(_col1)
                  bucketGroup: false
                  keys:
                        expr: _col0
                        type: string
                  mode: hash
                  outputColumnNames: _col0, _col1
                  Reduce Output Operator
                    key expressions:
                          expr: _col0
                          type: string
                    sort order: +
                    Map-reduce partition columns:
                          expr: _col0
                          type: string
                    tag: -1
                    value expressions:
                          expr: _col1
                          type: bigint
        maprfs:/user/hadoop/tmp/hive/hive_2014-06-10_15-42-37_188_2403850033480056671-5/-mr-10003 
          TableScan
            Union
              Select Operator
                expressions:
                      expr: _col0
                      type: string
                      expr: _col1
                      type: bigint
                outputColumnNames: _col0, _col1
                Group By Operator
                  aggregations:
                        expr: sum(_col1)
                  bucketGroup: false
                  keys:
                        expr: _col0
                        type: string
                  mode: hash
                  outputColumnNames: _col0, _col1
                  Reduce Output Operator
                    key expressions:
                          expr: _col0
                          type: string
                    sort order: +
                    Map-reduce partition columns:
                          expr: _col0
                          type: string
                    tag: -1
                    value expressions:
                          expr: _col1
                          type: bigint
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: sum(VALUE._col0)
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: bigint
            outputColumnNames: _col0, _col1
            File Output Operator
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-3
    Map Reduce
      Alias -> Map Operator Tree:
        null-subquery1:z-subquery1:TBL 
          TableScan
            alias: TBL
            Select Operator
              expressions:
                    expr: b
                    type: string
              outputColumnNames: b
              Group By Operator
                aggregations:
                      expr: count(1)
                bucketGroup: false
                keys:
                      expr: b
                      type: string
                mode: hash
                outputColumnNames: _col0, _col1
                Reduce Output Operator
                  key expressions:
                        expr: _col0
                        type: string
                  sort order: +
                  Map-reduce partition columns:
                        expr: _col0
                        type: string
                  tag: -1
                  value expressions:
                        expr: _col1
                        type: bigint
      Reduce Operator Tree:
        Group By Operator
          aggregations:
                expr: count(VALUE._col0)
          bucketGroup: false
          keys:
                expr: KEY._col0
                type: string
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Select Operator
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: bigint
            outputColumnNames: _col0, _col1
            File Output Operator
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1
{code}


Diffs
-----

  ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java 03194a4 
  ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java 772dda6 
  ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 1dde78e 
  ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java 792d87f 
  ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java 91b2369 
  ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java c747099 
  ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java e877cd4 
  ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java b10a7fa 
  ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java db94271 
  ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java a63466a 
  ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java 58d1638 
  ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java e884afd 
  ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java c52f753 
  ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java 43458d9 
  ql/src/test/queries/clientpositive/correlationoptimizer16.q PRE-CREATION 
  ql/src/test/results/clientpositive/correlationoptimizer16.q.out PRE-CREATION 

Diff: https://reviews.apache.org/r/23270/diff/


Testing
-------


Thanks,

Navis Ryu


Re: Review Request 23270: Wrong results when union all of grouping followed by group by with correlation optimization

Posted by Yin Huai <hu...@cse.ohio-state.edu>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23270/#review47707
-----------------------------------------------------------



ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
<https://reviews.apache.org/r/23270/#comment83868>

    What does flush do?



ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java
<https://reviews.apache.org/r/23270/#comment83867>

    Why remove this method? The rows in a key group are sorted by tags. If we see a new tag, we can call end group for operators which have smaller tags. Also, the JoinOperator assumes that the rows are sorted by tags. I think we need this method to make sure for the optimized plan, JoinOperator still get rows sorted by tags (within a key group).



ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
<https://reviews.apache.org/r/23270/#comment83873>

    Do we need this?



ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
<https://reviews.apache.org/r/23270/#comment83877>

    Seems the logic at here is used to check if we are processing the last alias of this JoinOperaotr and because endGroupIfNecessary is removed in this patch, rows within a key group may not sorted by tags. I am not sure if this is what we want because the behavior of the JoinOperator when we have an optimized plan may be different from a not optimized plan. I mean the right most table may not be the stream table for a plan optimized by the correlation optimizer.



ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
<https://reviews.apache.org/r/23270/#comment83874>

    Do we need this?



ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
<https://reviews.apache.org/r/23270/#comment83872>

    Do we need this?



ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
<https://reviews.apache.org/r/23270/#comment83866>

    Seems we do not need this line, right?



ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
<https://reviews.apache.org/r/23270/#comment83869>

    Do we need this?



ql/src/test/queries/clientpositive/correlationoptimizer16.q
<https://reviews.apache.org/r/23270/#comment83870>

    I think correlationoptimizer8 is for cases with UNION ALL. Can we add test queries in that file?


- Yin Huai


On July 4, 2014, 12:15 a.m., Navis Ryu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23270/
> -----------------------------------------------------------
> 
> (Updated July 4, 2014, 12:15 a.m.)
> 
> 
> Review request for hive.
> 
> 
> Bugs: HIVE-7205
>     https://issues.apache.org/jira/browse/HIVE-7205
> 
> 
> Repository: hive-git
> 
> 
> Description
> -------
> 
> use case :
> 
> table TBL (a string,b string) contains single row : 'a','a'
> the following query :
> {code:sql}
> select b, sum(cc) from (
>         select b,count(1) as cc from TBL group by b
>         union all
>         select a as b,count(1) as cc from TBL group by a
> ) z
> group by b
> {code}
> returns 
> 
> a 1
> a 1
> while set hive.optimize.correlation=true;
> 
> if we change set hive.optimize.correlation=false;
> it returns correct results : a 2
> 
> 
> The plan with correlation optimization :
> {code:sql}
> ABSTRACT SYNTAX TREE:
>   (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_UNION (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL a) b) (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL a))))) z)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL cc)))) (TOK_GROUPBY (TOK_TABLE_OR_COL b))))
> 
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-0 is a root stage
> 
> STAGE PLANS:
>   Stage: Stage-1
>     Map Reduce
>       Alias -> Map Operator Tree:
>         null-subquery1:z-subquery1:TBL 
>           TableScan
>             alias: TBL
>             Select Operator
>               expressions:
>                     expr: b
>                     type: string
>               outputColumnNames: b
>               Group By Operator
>                 aggregations:
>                       expr: count(1)
>                 bucketGroup: false
>                 keys:
>                       expr: b
>                       type: string
>                 mode: hash
>                 outputColumnNames: _col0, _col1
>                 Reduce Output Operator
>                   key expressions:
>                         expr: _col0
>                         type: string
>                   sort order: +
>                   Map-reduce partition columns:
>                         expr: _col0
>                         type: string
>                   tag: 0
>                   value expressions:
>                         expr: _col1
>                         type: bigint
>         null-subquery2:z-subquery2:TBL 
>           TableScan
>             alias: TBL
>             Select Operator
>               expressions:
>                     expr: a
>                     type: string
>               outputColumnNames: a
>               Group By Operator
>                 aggregations:
>                       expr: count(1)
>                 bucketGroup: false
>                 keys:
>                       expr: a
>                       type: string
>                 mode: hash
>                 outputColumnNames: _col0, _col1
>                 Reduce Output Operator
>                   key expressions:
>                         expr: _col0
>                         type: string
>                   sort order: +
>                   Map-reduce partition columns:
>                         expr: _col0
>                         type: string
>                   tag: 1
>                   value expressions:
>                         expr: _col1
>                         type: bigint
>       Reduce Operator Tree:
>         Demux Operator
>           Group By Operator
>             aggregations:
>                   expr: count(VALUE._col0)
>             bucketGroup: false
>             keys:
>                   expr: KEY._col0
>                   type: string
>             mode: mergepartial
>             outputColumnNames: _col0, _col1
>             Select Operator
>               expressions:
>                     expr: _col0
>                     type: string
>                     expr: _col1
>                     type: bigint
>               outputColumnNames: _col0, _col1
>               Union
>                 Select Operator
>                   expressions:
>                         expr: _col0
>                         type: string
>                         expr: _col1
>                         type: bigint
>                   outputColumnNames: _col0, _col1
>                   Mux Operator
>                     Group By Operator
>                       aggregations:
>                             expr: sum(_col1)
>                       bucketGroup: false
>                       keys:
>                             expr: _col0
>                             type: string
>                       mode: complete
>                       outputColumnNames: _col0, _col1
>                       Select Operator
>                         expressions:
>                               expr: _col0
>                               type: string
>                               expr: _col1
>                               type: bigint
>                         outputColumnNames: _col0, _col1
>                         File Output Operator
>                           compressed: false
>                           GlobalTableId: 0
>                           table:
>                               input format: org.apache.hadoop.mapred.TextInputFormat
>                               output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>           Group By Operator
>             aggregations:
>                   expr: count(VALUE._col0)
>             bucketGroup: false
>             keys:
>                   expr: KEY._col0
>                   type: string
>             mode: mergepartial
>             outputColumnNames: _col0, _col1
>             Select Operator
>               expressions:
>                     expr: _col0
>                     type: string
>                     expr: _col1
>                     type: bigint
>               outputColumnNames: _col0, _col1
>               Union
>                 Select Operator
>                   expressions:
>                         expr: _col0
>                         type: string
>                         expr: _col1
>                         type: bigint
>                   outputColumnNames: _col0, _col1
>                   Mux Operator
>                     Group By Operator
>                       aggregations:
>                             expr: sum(_col1)
>                       bucketGroup: false
>                       keys:
>                             expr: _col0
>                             type: string
>                       mode: complete
>                       outputColumnNames: _col0, _col1
>                       Select Operator
>                         expressions:
>                               expr: _col0
>                               type: string
>                               expr: _col1
>                               type: bigint
>                         outputColumnNames: _col0, _col1
>                         File Output Operator
>                           compressed: false
>                           GlobalTableId: 0
>                           table:
>                               input format: org.apache.hadoop.mapred.TextInputFormat
>                               output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> 
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
> {code}
> 
> Plan without correlation optimization :
> 
> {code:sql}
> ABSTRACT SYNTAX TREE:
>   (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_UNION (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL b)))) (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME DB TBL))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL a) b) (TOK_SELEXPR (TOK_FUNCTION count 1) cc)) (TOK_GROUPBY (TOK_TABLE_OR_COL a))))) z)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL b)) (TOK_SELEXPR (TOK_FUNCTION sum (TOK_TABLE_OR_COL cc)))) (TOK_GROUPBY (TOK_TABLE_OR_COL b))))
> 
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-2 depends on stages: Stage-1, Stage-3
>   Stage-3 is a root stage
>   Stage-0 is a root stage
> 
> STAGE PLANS:
>   Stage: Stage-1
>     Map Reduce
>       Alias -> Map Operator Tree:
>         null-subquery2:z-subquery2:TBL 
>           TableScan
>             alias: TBL
>             Select Operator
>               expressions:
>                     expr: a
>                     type: string
>               outputColumnNames: a
>               Group By Operator
>                 aggregations:
>                       expr: count(1)
>                 bucketGroup: false
>                 keys:
>                       expr: a
>                       type: string
>                 mode: hash
>                 outputColumnNames: _col0, _col1
>                 Reduce Output Operator
>                   key expressions:
>                         expr: _col0
>                         type: string
>                   sort order: +
>                   Map-reduce partition columns:
>                         expr: _col0
>                         type: string
>                   tag: -1
>                   value expressions:
>                         expr: _col1
>                         type: bigint
>       Reduce Operator Tree:
>         Group By Operator
>           aggregations:
>                 expr: count(VALUE._col0)
>           bucketGroup: false
>           keys:
>                 expr: KEY._col0
>                 type: string
>           mode: mergepartial
>           outputColumnNames: _col0, _col1
>           Select Operator
>             expressions:
>                   expr: _col0
>                   type: string
>                   expr: _col1
>                   type: bigint
>             outputColumnNames: _col0, _col1
>             File Output Operator
>               compressed: false
>               GlobalTableId: 0
>               table:
>                   input format: org.apache.hadoop.mapred.SequenceFileInputFormat
>                   output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> 
>   Stage: Stage-2
>     Map Reduce
>       Alias -> Map Operator Tree:
>         maprfs:/user/hadoop/tmp/hive/hive_2014-06-10_15-42-37_188_2403850033480056671-5/-mr-10002 
>           TableScan
>             Union
>               Select Operator
>                 expressions:
>                       expr: _col0
>                       type: string
>                       expr: _col1
>                       type: bigint
>                 outputColumnNames: _col0, _col1
>                 Group By Operator
>                   aggregations:
>                         expr: sum(_col1)
>                   bucketGroup: false
>                   keys:
>                         expr: _col0
>                         type: string
>                   mode: hash
>                   outputColumnNames: _col0, _col1
>                   Reduce Output Operator
>                     key expressions:
>                           expr: _col0
>                           type: string
>                     sort order: +
>                     Map-reduce partition columns:
>                           expr: _col0
>                           type: string
>                     tag: -1
>                     value expressions:
>                           expr: _col1
>                           type: bigint
>         maprfs:/user/hadoop/tmp/hive/hive_2014-06-10_15-42-37_188_2403850033480056671-5/-mr-10003 
>           TableScan
>             Union
>               Select Operator
>                 expressions:
>                       expr: _col0
>                       type: string
>                       expr: _col1
>                       type: bigint
>                 outputColumnNames: _col0, _col1
>                 Group By Operator
>                   aggregations:
>                         expr: sum(_col1)
>                   bucketGroup: false
>                   keys:
>                         expr: _col0
>                         type: string
>                   mode: hash
>                   outputColumnNames: _col0, _col1
>                   Reduce Output Operator
>                     key expressions:
>                           expr: _col0
>                           type: string
>                     sort order: +
>                     Map-reduce partition columns:
>                           expr: _col0
>                           type: string
>                     tag: -1
>                     value expressions:
>                           expr: _col1
>                           type: bigint
>       Reduce Operator Tree:
>         Group By Operator
>           aggregations:
>                 expr: sum(VALUE._col0)
>           bucketGroup: false
>           keys:
>                 expr: KEY._col0
>                 type: string
>           mode: mergepartial
>           outputColumnNames: _col0, _col1
>           Select Operator
>             expressions:
>                   expr: _col0
>                   type: string
>                   expr: _col1
>                   type: bigint
>             outputColumnNames: _col0, _col1
>             File Output Operator
>               compressed: false
>               GlobalTableId: 0
>               table:
>                   input format: org.apache.hadoop.mapred.TextInputFormat
>                   output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
> 
>   Stage: Stage-3
>     Map Reduce
>       Alias -> Map Operator Tree:
>         null-subquery1:z-subquery1:TBL 
>           TableScan
>             alias: TBL
>             Select Operator
>               expressions:
>                     expr: b
>                     type: string
>               outputColumnNames: b
>               Group By Operator
>                 aggregations:
>                       expr: count(1)
>                 bucketGroup: false
>                 keys:
>                       expr: b
>                       type: string
>                 mode: hash
>                 outputColumnNames: _col0, _col1
>                 Reduce Output Operator
>                   key expressions:
>                         expr: _col0
>                         type: string
>                   sort order: +
>                   Map-reduce partition columns:
>                         expr: _col0
>                         type: string
>                   tag: -1
>                   value expressions:
>                         expr: _col1
>                         type: bigint
>       Reduce Operator Tree:
>         Group By Operator
>           aggregations:
>                 expr: count(VALUE._col0)
>           bucketGroup: false
>           keys:
>                 expr: KEY._col0
>                 type: string
>           mode: mergepartial
>           outputColumnNames: _col0, _col1
>           Select Operator
>             expressions:
>                   expr: _col0
>                   type: string
>                   expr: _col1
>                   type: bigint
>             outputColumnNames: _col0, _col1
>             File Output Operator
>               compressed: false
>               GlobalTableId: 0
>               table:
>                   input format: org.apache.hadoop.mapred.SequenceFileInputFormat
>                   output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> 
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
> {code}
> 
> 
> Diffs
> -----
> 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java 03194a4 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java 772dda6 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 1dde78e 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java 792d87f 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java 91b2369 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java c747099 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java e877cd4 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java b10a7fa 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java db94271 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java a63466a 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java 58d1638 
>   ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java e884afd 
>   ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java c52f753 
>   ql/src/test/org/apache/hadoop/hive/ql/exec/vector/util/FakeCaptureOutputOperator.java 43458d9 
>   ql/src/test/queries/clientpositive/correlationoptimizer16.q PRE-CREATION 
>   ql/src/test/results/clientpositive/correlationoptimizer16.q.out PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/23270/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Navis Ryu
> 
>