You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2015/10/23 17:31:02 UTC

[5/5] hive git commit: HIVE-11954: Extend logic to choose side table in MapJoin Conversion algorithm (Jesus Camacho Rodriguez, reviewed by Laljo John Pullokkaran)

HIVE-11954: Extend logic to choose side table in MapJoin Conversion algorithm (Jesus Camacho Rodriguez, reviewed by Laljo John Pullokkaran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8e62edac
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8e62edac
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8e62edac

Branch: refs/heads/master
Commit: 8e62edac34538d38d3ba4db158481f5d7735199f
Parents: 37a8fe0
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Oct 22 13:43:14 2015 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Fri Oct 23 08:30:15 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/OperatorUtils.java      |   21 +
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   |   99 +-
 .../llap/bucket_map_join_tez1.q.out             |  244 ++--
 .../llap/bucket_map_join_tez2.q.out             |  112 +-
 .../llap/dynamic_partition_pruning.q.out        |  130 ++-
 .../clientpositive/llap/explainuser_2.q.out     | 1070 +++++++++---------
 .../test/results/clientpositive/llap/mrr.q.out  |   53 +-
 .../vectorized_dynamic_partition_pruning.q.out  |  130 ++-
 .../tez/auto_sortmerge_join_10.q.out            |   57 +-
 .../tez/auto_sortmerge_join_12.q.out            |   95 +-
 .../tez/bucket_map_join_tez1.q.out              |  236 ++--
 .../tez/bucket_map_join_tez2.q.out              |  108 +-
 .../tez/cross_product_check_2.q.out             |  195 ++--
 .../tez/dynamic_partition_pruning.q.out         |  128 ++-
 .../clientpositive/tez/explainuser_2.q.out      | 1070 +++++++++---------
 .../test/results/clientpositive/tez/mrr.q.out   |   53 +-
 .../clientpositive/tez/unionDistinct_1.q.out    |   58 +-
 .../vectorized_dynamic_partition_pruning.q.out  |  130 ++-
 18 files changed, 2004 insertions(+), 1985 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8e62edac/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index f00fc77..bd10912 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -277,4 +277,25 @@ public class OperatorUtils {
     }
     return resultMap.build();
   }
+
+  /**
+   * Given an operator and a set of classes, it returns the number of operators it finds
+   * upstream that instantiate any of the given classes.
+   *
+   * @param start the start operator
+   * @param classes the set of classes
+   * @return the number of operators
+   */
+  public static int countOperatorsUpstream(Operator<?> start, Set<Class<? extends Operator<?>>> classes) {
+    Multimap<Class<? extends Operator<?>>, Operator<?>> ops = classifyOperatorsUpstream(start, classes);
+    int numberOperators = 0;
+    Set<Operator<?>> uniqueOperators = new HashSet<Operator<?>>();
+    for (Operator<?> op : ops.values()) {
+      if (uniqueOperators.add(op)) {
+        numberOperators++;
+      }
+    }
+    return numberOperators;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8e62edac/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 024849e..e63de7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -32,18 +32,22 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MuxOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -61,6 +65,8 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.collect.ImmutableSet;
+
 /**
  * ConvertJoinMapJoin is an optimization that replaces a common join
  * (aka shuffle join) with a map join (aka broadcast or fragment replicate
@@ -70,7 +76,18 @@ import org.apache.hadoop.util.ReflectionUtils;
  */
 public class ConvertJoinMapJoin implements NodeProcessor {
 
-  static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
+  private static final Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  private static final Set<Class<? extends Operator<?>>> COSTLY_OPERATORS =
+          new ImmutableSet.Builder()
+                  .add(CommonJoinOperator.class)
+                  .add(GroupByOperator.class)
+                  .add(LateralViewJoinOperator.class)
+                  .add(PTFOperator.class)
+                  .add(ReduceSinkOperator.class)
+                  .add(UDTFOperator.class)
+                  .build();
 
   @Override
   /*
@@ -538,16 +555,20 @@ public class ConvertJoinMapJoin implements NodeProcessor {
         HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
 
     int bigTablePosition = -1;
-
+    // number of costly ops (Join, GB, PTF/Windowing, TF) below the big input
+    int bigInputNumberCostlyOps = -1;
+    // stats of the big input
     Statistics bigInputStat = null;
-    long totalSize = 0;
-    int pos = 0;
 
     // bigTableFound means we've encountered a table that's bigger than the
     // max. This table is either the the big table or we cannot convert.
-    boolean bigTableFound = false;
+    boolean foundInputNotFittingInMemory = false;
 
-    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+    // total size of the inputs
+    long totalSize = 0;
+
+    for (int pos = 0; pos < joinOp.getParentOperators().size(); pos++) {
+      Operator<? extends OperatorDesc> parentOp = joinOp.getParentOperators().get(pos);
 
       Statistics currInputStat = parentOp.getStatistics();
       if (currInputStat == null) {
@@ -556,15 +577,17 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       }
 
       long inputSize = currInputStat.getDataSize();
+
+      boolean currentInputNotFittingInMemory = false;
       if ((bigInputStat == null)
-          || ((bigInputStat != null) && (inputSize > bigInputStat.getDataSize()))) {
+              || ((bigInputStat != null) && (inputSize > bigInputStat.getDataSize()))) {
 
-        if (bigTableFound) {
+        if (foundInputNotFittingInMemory) {
           // cannot convert to map join; we've already chosen a big table
           // on size and there's another one that's bigger.
           return -1;
         }
-
+        
         if (inputSize/buckets > maxSize) {
           if (!bigTableCandidateSet.contains(pos)) {
             // can't use the current table as the big table, but it's too
@@ -572,33 +595,46 @@ public class ConvertJoinMapJoin implements NodeProcessor {
             return -1;
           }
 
-          bigTableFound = true;
+          currentInputNotFittingInMemory = true;
+          foundInputNotFittingInMemory = true;
         }
+      }
 
-        if (bigInputStat != null) {
-          // we're replacing the current big table with a new one. Need
-          // to count the current one as a map table then.
-          totalSize += bigInputStat.getDataSize();
-        }
+      int currentInputNumberCostlyOps = foundInputNotFittingInMemory ?
+              -1 : OperatorUtils.countOperatorsUpstream(parentOp, COSTLY_OPERATORS);
+
+      // This input is the big table if it is contained in the big candidates set, and either:
+      // 1) we have not chosen a big table yet, or
+      // 2) it has been chosen as the big table above, or
+      // 3) the number of costly operators for this input is higher, or
+      // 4) the number of costly operators is equal, but the size is bigger,
+      boolean selectedBigTable = bigTableCandidateSet.contains(pos) &&
+              (bigInputStat == null || currentInputNotFittingInMemory ||
+                      (!foundInputNotFittingInMemory && (currentInputNumberCostlyOps > bigInputNumberCostlyOps ||
+                              (currentInputNumberCostlyOps == bigInputNumberCostlyOps && inputSize > bigInputStat.getDataSize()))));
+
+      if (bigInputStat != null && selectedBigTable) {
+        // We are replacing the current big table with a new one, thus
+        // we need to count the current one as a map table then.
+        totalSize += bigInputStat.getDataSize();
+      } else if (!selectedBigTable) {
+        // This is not the first table and we are not using it as big table,
+        // in fact, we're adding this table as a map table
+        totalSize += inputSize;
+      }
 
-        if (totalSize/buckets > maxSize) {
-          // sum of small tables size in this join exceeds configured limit
-          // hence cannot convert.
-          return -1;
-        }
+      if (totalSize/buckets > maxSize) {
+        // sum of small tables size in this join exceeds configured limit
+        // hence cannot convert.
+        return -1;
+      }
 
-        if (bigTableCandidateSet.contains(pos)) {
-          bigTablePosition = pos;
-          bigInputStat = currInputStat;
-        }
-      } else {
-        totalSize += currInputStat.getDataSize();
-        if (totalSize/buckets > maxSize) {
-          // cannot hold all map tables in memory. Cannot convert.
-          return -1;
-        }
+      if (selectedBigTable) {
+        bigTablePosition = pos;
+        bigInputNumberCostlyOps = currentInputNumberCostlyOps;
+        bigInputStat = currInputStat;
       }
-      pos++;
+
     }
 
     return bigTablePosition;
@@ -616,7 +652,6 @@ public class ConvertJoinMapJoin implements NodeProcessor {
    *
    * for tez.
    */
-
   public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
       int bigTablePosition, boolean removeReduceSink) throws SemanticException {
     // bail on mux operator because currently the mux operator masks the emit keys

http://git-wip-us.apache.org/repos/asf/hive/blob/8e62edac/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
index 4699e10..1f1bf3d 100644
--- a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
+++ b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez1.q.out
@@ -329,8 +329,7 @@ STAGE PLANS:
   Stage: Stage-1
     Tez
       Edges:
-        Map 3 <- Reducer 2 (CUSTOM_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -362,31 +361,15 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 _col1 (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1, _col3
-                      input vertices:
-                        0 Reducer 2
-                      Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
-                      HybridGraceHashJoin: true
-                      Select Operator
-                        expressions: _col1 (type: int), _col0 (type: double), _col3 (type: string)
-                        outputColumnNames: _col0, _col1, _col2
-                        Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    Reduce Output Operator
+                      key expressions: key (type: int)
+                      sort order: +
+                      Map-reduce partition columns: key (type: int)
+                      Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: value (type: string)
             Execution mode: llap
         Reducer 2 
-            Execution mode: llap
+            Execution mode: uber
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -398,12 +381,28 @@ STAGE PLANS:
                   expressions: _col1 (type: double), _col0 (type: int)
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 7 Data size: 728 Basic stats: COMPLETE Column stats: NONE
-                  Reduce Output Operator
-                    key expressions: _col1 (type: int)
-                    sort order: +
-                    Map-reduce partition columns: _col1 (type: int)
-                    Statistics: Num rows: 7 Data size: 728 Basic stats: COMPLETE Column stats: NONE
-                    value expressions: _col0 (type: double)
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    keys:
+                      0 _col1 (type: int)
+                      1 key (type: int)
+                    outputColumnNames: _col0, _col1, _col3
+                    input vertices:
+                      1 Map 3
+                    Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                    HybridGraceHashJoin: true
+                    Select Operator
+                      expressions: _col1 (type: int), _col0 (type: double), _col3 (type: string)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                        table:
+                            input format: org.apache.hadoop.mapred.TextInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -556,8 +555,7 @@ STAGE PLANS:
     Tez
       Edges:
         Map 1 <- Map 3 (CUSTOM_EDGE)
-        Map 4 <- Reducer 2 (CUSTOM_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -614,31 +612,15 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 _col1 (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1, _col3
-                      input vertices:
-                        0 Reducer 2
-                      Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                      HybridGraceHashJoin: true
-                      Select Operator
-                        expressions: _col1 (type: int), _col0 (type: double), _col3 (type: string)
-                        outputColumnNames: _col0, _col1, _col2
-                        Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    Reduce Output Operator
+                      key expressions: key (type: int)
+                      sort order: +
+                      Map-reduce partition columns: key (type: int)
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: value (type: string)
             Execution mode: llap
         Reducer 2 
-            Execution mode: llap
+            Execution mode: uber
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -650,12 +632,28 @@ STAGE PLANS:
                   expressions: _col1 (type: double), _col0 (type: int)
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 66 Data size: 700 Basic stats: COMPLETE Column stats: NONE
-                  Reduce Output Operator
-                    key expressions: _col1 (type: int)
-                    sort order: +
-                    Map-reduce partition columns: _col1 (type: int)
-                    Statistics: Num rows: 66 Data size: 700 Basic stats: COMPLETE Column stats: NONE
-                    value expressions: _col0 (type: double)
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    keys:
+                      0 _col1 (type: int)
+                      1 key (type: int)
+                    outputColumnNames: _col0, _col1, _col3
+                    input vertices:
+                      1 Map 4
+                    Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                    HybridGraceHashJoin: true
+                    Select Operator
+                      expressions: _col1 (type: int), _col0 (type: double), _col3 (type: string)
+                      outputColumnNames: _col0, _col1, _col2
+                      Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                        table:
+                            input format: org.apache.hadoop.mapred.TextInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -871,8 +869,7 @@ STAGE PLANS:
   Stage: Stage-1
     Tez
       Edges:
-        Map 3 <- Reducer 2 (CUSTOM_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -904,31 +901,15 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 _col0 (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1, _col3
-                      input vertices:
-                        0 Reducer 2
-                      Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                      HybridGraceHashJoin: true
-                      Select Operator
-                        expressions: _col0 (type: int), _col1 (type: double), _col3 (type: string)
-                        outputColumnNames: _col0, _col1, _col2
-                        Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    Reduce Output Operator
+                      key expressions: key (type: int)
+                      sort order: +
+                      Map-reduce partition columns: key (type: int)
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: value (type: string)
             Execution mode: llap
         Reducer 2 
-            Execution mode: llap
+            Execution mode: uber
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -936,12 +917,28 @@ STAGE PLANS:
                 mode: mergepartial
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 7 Data size: 728 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: int)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: int)
-                  Statistics: Num rows: 7 Data size: 728 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col1 (type: double)
+                Map Join Operator
+                  condition map:
+                       Inner Join 0 to 1
+                  keys:
+                    0 _col0 (type: int)
+                    1 key (type: int)
+                  outputColumnNames: _col0, _col1, _col3
+                  input vertices:
+                    1 Map 3
+                  Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                  HybridGraceHashJoin: true
+                  Select Operator
+                    expressions: _col0 (type: int), _col1 (type: double), _col3 (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -967,8 +964,7 @@ STAGE PLANS:
   Stage: Stage-1
     Tez
       Edges:
-        Map 3 <- Reducer 2 (CUSTOM_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -994,31 +990,15 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 _col0 (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1, _col3
-                      input vertices:
-                        0 Reducer 2
-                      Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                      HybridGraceHashJoin: true
-                      Select Operator
-                        expressions: _col0 (type: int), _col1 (type: double), _col3 (type: string)
-                        outputColumnNames: _col0, _col1, _col2
-                        Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    Reduce Output Operator
+                      key expressions: key (type: int)
+                      sort order: +
+                      Map-reduce partition columns: key (type: int)
+                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: value (type: string)
             Execution mode: llap
         Reducer 2 
-            Execution mode: llap
+            Execution mode: uber
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -1026,12 +1006,28 @@ STAGE PLANS:
                 mode: complete
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 7 Data size: 728 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: int)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: int)
-                  Statistics: Num rows: 7 Data size: 728 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col1 (type: double)
+                Map Join Operator
+                  condition map:
+                       Inner Join 0 to 1
+                  keys:
+                    0 _col0 (type: int)
+                    1 key (type: int)
+                  outputColumnNames: _col0, _col1, _col3
+                  input vertices:
+                    1 Map 3
+                  Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                  HybridGraceHashJoin: true
+                  Select Operator
+                    expressions: _col0 (type: int), _col1 (type: double), _col3 (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/8e62edac/ql/src/test/results/clientpositive/llap/bucket_map_join_tez2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez2.q.out b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez2.q.out
index 111aaaa..68d1253 100644
--- a/ql/src/test/results/clientpositive/llap/bucket_map_join_tez2.q.out
+++ b/ql/src/test/results/clientpositive/llap/bucket_map_join_tez2.q.out
@@ -527,8 +527,7 @@ STAGE PLANS:
   Stage: Stage-1
     Tez
       Edges:
-        Map 3 <- Reducer 2 (CUSTOM_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (CUSTOM_SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -558,38 +557,38 @@ STAGE PLANS:
                   Filter Operator
                     predicate: key is not null (type: boolean)
                     Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 _col0 (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0, _col1
-                      input vertices:
-                        0 Reducer 2
-                      Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
-                      HybridGraceHashJoin: true
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
-                        table:
-                            input format: org.apache.hadoop.mapred.TextInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    Reduce Output Operator
+                      key expressions: key (type: int)
+                      sort order: +
+                      Map-reduce partition columns: key (type: int)
+                      Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
             Execution mode: llap
         Reducer 2 
-            Execution mode: llap
+            Execution mode: uber
             Reduce Operator Tree:
               Group By Operator
                 keys: KEY._col0 (type: int)
                 mode: mergepartial
                 outputColumnNames: _col0
                 Statistics: Num rows: 60 Data size: 636 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: int)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: int)
-                  Statistics: Num rows: 60 Data size: 636 Basic stats: COMPLETE Column stats: NONE
+                Map Join Operator
+                  condition map:
+                       Inner Join 0 to 1
+                  keys:
+                    0 _col0 (type: int)
+                    1 key (type: int)
+                  outputColumnNames: _col0, _col1
+                  input vertices:
+                    1 Map 3
+                  Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                  HybridGraceHashJoin: true
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator
@@ -609,8 +608,7 @@ STAGE PLANS:
   Stage: Stage-1
     Tez
       Edges:
-        Map 3 <- Reducer 2 (BROADCAST_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (BROADCAST_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -640,43 +638,43 @@ STAGE PLANS:
                   Filter Operator
                     predicate: UDFToDouble(key) is not null (type: boolean)
                     Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 UDFToDouble(_col0) (type: double)
-                        1 UDFToDouble(key) (type: double)
-                      outputColumnNames: _col0, _col2
-                      input vertices:
-                        0 Reducer 2
-                      Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
-                      HybridGraceHashJoin: true
-                      Select Operator
-                        expressions: _col0 (type: string), _col2 (type: string)
-                        outputColumnNames: _col0, _col1
-                        Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
-                        File Output Operator
-                          compressed: false
-                          Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
-                          table:
-                              input format: org.apache.hadoop.mapred.TextInputFormat
-                              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    Reduce Output Operator
+                      key expressions: UDFToDouble(key) (type: double)
+                      sort order: +
+                      Map-reduce partition columns: UDFToDouble(key) (type: double)
+                      Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: value (type: string)
             Execution mode: llap
         Reducer 2 
-            Execution mode: llap
+            Execution mode: uber
             Reduce Operator Tree:
               Group By Operator
                 keys: KEY._col0 (type: string)
                 mode: mergepartial
                 outputColumnNames: _col0
                 Statistics: Num rows: 60 Data size: 636 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: UDFToDouble(_col0) (type: double)
-                  sort order: +
-                  Map-reduce partition columns: UDFToDouble(_col0) (type: double)
-                  Statistics: Num rows: 60 Data size: 636 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col0 (type: string)
+                Map Join Operator
+                  condition map:
+                       Inner Join 0 to 1
+                  keys:
+                    0 UDFToDouble(_col0) (type: double)
+                    1 UDFToDouble(key) (type: double)
+                  outputColumnNames: _col0, _col2
+                  input vertices:
+                    1 Map 3
+                  Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                  HybridGraceHashJoin: true
+                  Select Operator
+                    expressions: _col0 (type: string), _col2 (type: string)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
   Stage: Stage-0
     Fetch Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/8e62edac/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out
index 3ebd690..4320f01 100644
--- a/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out
+++ b/ql/src/test/results/clientpositive/llap/dynamic_partition_pruning.q.out
@@ -4262,7 +4262,7 @@ POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
 POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
 #### A masked pattern was here ####
 1000
-Warning: Map Join MAPJOIN[21][bigTable=?] in task 'Map 1' is a cross product
+Warning: Map Join MAPJOIN[21][bigTable=?] in task 'Reducer 3' is a cross product
 PREHOOK: query: -- parent is reduce tasks
 EXPLAIN select count(*) from srcpart join (select ds as ds, ds as `date` from srcpart group by ds) s on (srcpart.ds = s.ds) where s.`date` = '2008-04-08'
 PREHOOK: type: QUERY
@@ -4277,9 +4277,8 @@ STAGE PLANS:
   Stage: Stage-1
     Tez
       Edges:
-        Map 1 <- Reducer 4 (BROADCAST_EDGE)
-        Reducer 2 <- Map 1 (SIMPLE_EDGE)
-        Reducer 4 <- Map 3 (SIMPLE_EDGE)
+        Reducer 3 <- Map 1 (BROADCAST_EDGE), Map 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -4290,26 +4289,11 @@ STAGE PLANS:
                   Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 
-                        1 
-                      input vertices:
-                        1 Reducer 4
-                      Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
-                      Group By Operator
-                        aggregations: count()
-                        mode: hash
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
-                        Reduce Output Operator
-                          sort order: 
-                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
-                          value expressions: _col0 (type: bigint)
+                    Reduce Output Operator
+                      sort order: 
+                      Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
             Execution mode: llap
-        Map 3 
+        Map 2 
             Map Operator Tree:
                 TableScan
                   alias: srcpart
@@ -4330,7 +4314,35 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 1000 Data size: 10624 Basic stats: COMPLETE Column stats: NONE
             Execution mode: llap
-        Reducer 2 
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    keys:
+                      0 
+                      1 
+                    input vertices:
+                      0 Map 1
+                    Statistics: Num rows: 1100 Data size: 11686 Basic stats: COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: count()
+                      mode: hash
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        sort order: 
+                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col0 (type: bigint)
+        Reducer 4 
             Execution mode: uber
             Reduce Operator Tree:
               Group By Operator
@@ -4345,19 +4357,6 @@ STAGE PLANS:
                       input format: org.apache.hadoop.mapred.TextInputFormat
                       output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                       serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-        Reducer 4 
-            Execution mode: llap
-            Reduce Operator Tree:
-              Group By Operator
-                keys: KEY._col0 (type: string)
-                mode: mergepartial
-                outputColumnNames: _col0
-                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                Select Operator
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Reduce Output Operator
-                    sort order: 
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
 
   Stage: Stage-0
     Fetch Operator
@@ -4365,7 +4364,7 @@ STAGE PLANS:
       Processor Tree:
         ListSink
 
-Warning: Map Join MAPJOIN[21][bigTable=?] in task 'Map 1' is a cross product
+Warning: Map Join MAPJOIN[21][bigTable=?] in task 'Reducer 3' is a cross product
 PREHOOK: query: select count(*) from srcpart join (select ds as ds, ds as `date` from srcpart group by ds) s on (srcpart.ds = s.ds) where s.`date` = '2008-04-08'
 PREHOOK: type: QUERY
 PREHOOK: Input: default@srcpart
@@ -4812,9 +4811,8 @@ STAGE PLANS:
   Stage: Stage-1
     Tez
       Edges:
-        Map 2 <- Map 1 (BROADCAST_EDGE)
-        Map 3 <- Map 2 (BROADCAST_EDGE)
-        Reducer 4 <- Map 3 (SIMPLE_EDGE)
+        Map 2 <- Map 1 (BROADCAST_EDGE), Map 4 (BROADCAST_EDGE)
+        Reducer 3 <- Map 2 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -4851,13 +4849,27 @@ STAGE PLANS:
                         0 Map 1
                       Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                       HybridGraceHashJoin: true
-                      Reduce Output Operator
-                        key expressions: '13' (type: string)
-                        sort order: +
-                        Map-reduce partition columns: '13' (type: string)
+                      Map Join Operator
+                        condition map:
+                             Inner Join 0 to 1
+                        keys:
+                          0 '13' (type: string)
+                          1 '13' (type: string)
+                        input vertices:
+                          1 Map 4
                         Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
+                        HybridGraceHashJoin: true
+                        Group By Operator
+                          aggregations: count()
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                          Reduce Output Operator
+                            sort order: 
+                            Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+                            value expressions: _col0 (type: bigint)
             Execution mode: llap
-        Map 3 
+        Map 4 
             Map Operator Tree:
                 TableScan
                   alias: srcpart_hour
@@ -4866,27 +4878,13 @@ STAGE PLANS:
                   Filter Operator
                     predicate: (hr = 13) (type: boolean)
                     Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
-                    Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 '13' (type: string)
-                        1 '13' (type: string)
-                      input vertices:
-                        0 Map 2
-                      Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
-                      HybridGraceHashJoin: true
-                      Group By Operator
-                        aggregations: count()
-                        mode: hash
-                        outputColumnNames: _col0
-                        Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
-                        Reduce Output Operator
-                          sort order: 
-                          Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
-                          value expressions: _col0 (type: bigint)
+                    Reduce Output Operator
+                      key expressions: '13' (type: string)
+                      sort order: +
+                      Map-reduce partition columns: '13' (type: string)
+                      Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
             Execution mode: llap
-        Reducer 4 
+        Reducer 3 
             Execution mode: uber
             Reduce Operator Tree:
               Group By Operator