You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/01/05 02:44:48 UTC

[8/8] hive git commit: HIVE-18361: Extend shared work optimizer to reuse computation beyond work boundaries (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

HIVE-18361: Extend shared work optimizer to reuse computation beyond work boundaries (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/96a409e1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/96a409e1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/96a409e1

Branch: refs/heads/master
Commit: 96a409e1c6ec846eeb6c72b50bed60790ccc1836
Parents: 3f5148d
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Thu Dec 21 17:08:07 2017 -0800
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu Jan 4 18:44:15 2018 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    4 +
 .../test/resources/testconfiguration.properties |    1 +
 .../hive/ql/optimizer/SharedWorkOptimizer.java  |  407 ++++++-
 .../test/queries/clientpositive/sharedworkext.q |   53 +
 .../clientpositive/llap/explainuser_1.q.out     |   47 +-
 .../clientpositive/llap/sharedworkext.q.out     | 1076 ++++++++++++++++++
 .../clientpositive/llap/subquery_multi.q.out    |  185 +--
 .../clientpositive/llap/subquery_notin.q.out    |  752 +++++-------
 .../clientpositive/llap/subquery_scalar.q.out   |  167 +--
 .../clientpositive/llap/subquery_select.q.out   |  110 +-
 .../clientpositive/llap/subquery_views.q.out    |   83 +-
 .../llap/vector_groupby_grouping_id2.q.out      |  138 +--
 .../llap/vector_groupby_grouping_sets4.q.out    |   71 +-
 .../clientpositive/perf/tez/query14.q.out       |  264 ++---
 .../clientpositive/perf/tez/query2.q.out        |  103 +-
 .../clientpositive/perf/tez/query23.q.out       |  360 ++----
 .../clientpositive/perf/tez/query32.q.out       |   30 +-
 .../clientpositive/perf/tez/query33.q.out       |  232 ++--
 .../clientpositive/perf/tez/query44.q.out       |   69 +-
 .../clientpositive/perf/tez/query47.q.out       |  224 ++--
 .../clientpositive/perf/tez/query54.q.out       |   34 +-
 .../clientpositive/perf/tez/query56.q.out       |  232 ++--
 .../clientpositive/perf/tez/query57.q.out       |  224 ++--
 .../clientpositive/perf/tez/query58.q.out       |  200 +---
 .../clientpositive/perf/tez/query59.q.out       |   52 +-
 .../clientpositive/perf/tez/query60.q.out       |  232 ++--
 .../clientpositive/perf/tez/query61.q.out       |  118 +-
 .../clientpositive/perf/tez/query64.q.out       |  468 +++-----
 .../clientpositive/perf/tez/query70.q.out       |   66 +-
 .../clientpositive/perf/tez/query83.q.out       |  116 +-
 .../clientpositive/perf/tez/query90.q.out       |   50 +-
 .../clientpositive/perf/tez/query92.q.out       |   30 +-
 32 files changed, 3056 insertions(+), 3142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 841d075..6529da6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1695,6 +1695,10 @@ public class HiveConf extends Configuration {
     HIVE_SHARED_WORK_OPTIMIZATION("hive.optimize.shared.work", true,
         "Whether to enable shared work optimizer. The optimizer finds scan operator over the same table\n" +
         "and follow-up operators in the query plan and merges them if they meet some preconditions. Tez only."),
+    HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION("hive.optimize.shared.work.extended", true,
+        "Whether to enable shared work extended optimizer. The optimizer tries to merge equal operators\n" +
+        "after a work boundary after shared work optimizer has been executed. Requires hive.optimize.shared.work\n" +
+        "to be set to true. Tez only."),
     HIVE_COMBINE_EQUIVALENT_WORK_OPTIMIZATION("hive.combine.equivalent.work.optimization", true, "Whether to " +
             "combine equivalent work objects during physical optimization.\n This optimization looks for equivalent " +
             "work objects and combines them if they meet certain preconditions. Spark only."),

http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 98e390c..ac81995 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -628,6 +628,7 @@ minillaplocal.query.files=\
   semijoin6.q,\
   semijoin7.q,\
   semijoin_hint.q,\
+  sharedworkext.q,\
   smb_cache.q,\
   special_character_in_tabnames_1.q,\
   sqlmerge.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
index d4ddb75..08fec42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedWorkOptimizer.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
@@ -36,6 +35,7 @@ import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
@@ -79,6 +79,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultiset;
 
 /**
@@ -102,8 +103,8 @@ import com.google.common.collect.TreeMultiset;
  *      |            |                  /   \
  *      Op           Op                Op   Op
  *
- * <p>A limitation in the current implementation is that the optimizer does not
- * go beyond a work boundary.
+ * <p>If the extended version of the optimizer is enabled, it can go beyond
+ * a work boundary to find reutilization opportunities.
  *
  * <p>The optimization only works with the Tez execution engine.
  */
@@ -146,13 +147,13 @@ public class SharedWorkOptimizer extends Transform {
       String tableName = tablePair.getKey();
       for (TableScanOperator discardableTsOp : tableNameToOps.get(tableName)) {
         if (removedOps.contains(discardableTsOp)) {
-          LOG.debug("Skip {} as it has been already removed", discardableTsOp);
+          LOG.debug("Skip {} as it has already been removed", discardableTsOp);
           continue;
         }
         Collection<TableScanOperator> prevTsOps = existingOps.get(tableName);
         for (TableScanOperator retainableTsOp : prevTsOps) {
           if (removedOps.contains(retainableTsOp)) {
-            LOG.debug("Skip {} as it has been already removed", retainableTsOp);
+            LOG.debug("Skip {} as it has already been removed", retainableTsOp);
             continue;
           }
 
@@ -167,7 +168,7 @@ public class SharedWorkOptimizer extends Transform {
           // Secondly, we extract information about the part of the tree that can be merged
           // as well as some structural information (memory consumption) that needs to be
           // used to determined whether the merge can happen
-          SharedResult sr = extractSharedOptimizationInfo(
+          SharedResult sr = extractSharedOptimizationInfoForRoot(
                   pctx, optimizerCache, retainableTsOp, discardableTsOp);
 
           // It seems these two operators can be merged.
@@ -197,7 +198,8 @@ public class SharedWorkOptimizer extends Transform {
               }
             }
 
-            LOG.debug("Merging subtree starting at {} into subtree starting at {}", discardableTsOp, retainableTsOp);
+            LOG.debug("Merging subtree starting at {} into subtree starting at {}",
+                discardableTsOp, retainableTsOp);
           } else {
             // Only TS operator
             ExprNodeGenericFuncDesc exprNode = null;
@@ -262,6 +264,7 @@ public class SharedWorkOptimizer extends Transform {
                       !sr.discardableInputOps.contains(sjbi.getTsOp())) {
                 GenTezUtils.removeSemiJoinOperator(
                         pctx, (ReduceSinkOperator) op, sjbi.getTsOp());
+                optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op);
               }
             } else if (op instanceof AppMasterEventOperator) {
               DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf();
@@ -269,6 +272,7 @@ public class SharedWorkOptimizer extends Transform {
                       !sr.discardableInputOps.contains(dped.getTableScan())) {
                 GenTezUtils.removeSemiJoinOperator(
                         pctx, (AppMasterEventOperator) op, dped.getTableScan());
+                optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op);
               }
             }
             LOG.debug("Input operator removed: {}", op);
@@ -292,10 +296,12 @@ public class SharedWorkOptimizer extends Transform {
                   GenTezUtils.removeSemiJoinOperator(pctx,
                           (ReduceSinkOperator) dppSource,
                           (TableScanOperator) sr.retainableOps.get(0));
+                  optimizerCache.tableScanToDPPSource.remove(sr.retainableOps.get(0), op);
                 } else if (dppSource instanceof AppMasterEventOperator) {
                   GenTezUtils.removeSemiJoinOperator(pctx,
                           (AppMasterEventOperator) dppSource,
                           (TableScanOperator) sr.retainableOps.get(0));
+                  optimizerCache.tableScanToDPPSource.remove(sr.retainableOps.get(0), op);
                 }
               }
             }
@@ -328,6 +334,203 @@ public class SharedWorkOptimizer extends Transform {
       LOG.debug("After SharedWorkOptimizer:\n" + Operator.toString(pctx.getTopOps().values()));
     }
 
+    if(pctx.getConf().getBoolVar(ConfVars.HIVE_SHARED_WORK_EXTENDED_OPTIMIZATION)) {
+      // Gather RS operators that 1) belong to root works, i.e., works containing TS operators,
+      // and 2) share the same input operator.
+      // These will be the first target for extended shared work optimization
+      Multimap<Operator<?>, ReduceSinkOperator> parentToRsOps = ArrayListMultimap.create();
+      Set<Operator<?>> visited = new HashSet<>();
+      for (Entry<String, TableScanOperator> e : topOps.entrySet()) {
+        gatherReduceSinkOpsByInput(parentToRsOps,  visited,
+            findWorkOperators(optimizerCache, e.getValue()));
+      }
+
+      while (!parentToRsOps.isEmpty()) {
+        // As above, we enforce a certain order when we do the reutilization.
+        // In particular, we use size of data in RS x number of uses.
+        List<Entry<Operator<?>, Long>> sortedRSGroups =
+            rankOpsByAccumulatedSize(parentToRsOps.keySet());
+        LOG.debug("Sorted operators by size: {}", sortedRSGroups);
+
+        // Execute extended optimization
+        // For each RS, check whether other RS in same work could be merge into this one.
+        // If they are merged, RS operators in the resulting work will be considered
+        // mergeable in next loop iteration.
+        Multimap<Operator<?>, ReduceSinkOperator> existingRsOps = ArrayListMultimap.create();
+        for (Entry<Operator<?>, Long> rsGroupInfo : sortedRSGroups) {
+          Operator<?> rsParent = rsGroupInfo.getKey();
+          for (ReduceSinkOperator discardableRsOp : parentToRsOps.get(rsParent)) {
+            if (removedOps.contains(discardableRsOp)) {
+              LOG.debug("Skip {} as it has already been removed", discardableRsOp);
+              continue;
+            }
+            Collection<ReduceSinkOperator> otherRsOps = existingRsOps.get(rsParent);
+            for (ReduceSinkOperator retainableRsOp : otherRsOps) {
+              if (removedOps.contains(retainableRsOp)) {
+                LOG.debug("Skip {} as it has already been removed", retainableRsOp);
+                continue;
+              }
+
+              // First we quickly check if the two RS operators can actually be merged.
+              // We already know that these two RS operators have the same parent, but
+              // we need to check whether both RS are actually equal. Further, we check
+              // whether their child is also equal. If any of these conditions are not
+              // met, we are not going to try to merge.
+              boolean mergeable = compareOperator(pctx, retainableRsOp, discardableRsOp) &&
+                  compareOperator(pctx, retainableRsOp.getChildOperators().get(0),
+                      discardableRsOp.getChildOperators().get(0));
+              if (!mergeable) {
+                // Skip
+                LOG.debug("{} and {} cannot be merged", retainableRsOp, discardableRsOp);
+                continue;
+              }
+
+              LOG.debug("Checking additional conditions for merging subtree starting at {}"
+                  + " into subtree starting at {}", discardableRsOp, retainableRsOp);
+
+              // Secondly, we extract information about the part of the tree that can be merged
+              // as well as some structural information (memory consumption) that needs to be
+              // used to determined whether the merge can happen
+              Operator<?> retainableRsOpChild = retainableRsOp.getChildOperators().get(0);
+              Operator<?> discardableRsOpChild = discardableRsOp.getChildOperators().get(0);
+              SharedResult sr = extractSharedOptimizationInfo(
+                  pctx, optimizerCache, retainableRsOp, discardableRsOp,
+                  retainableRsOpChild, discardableRsOpChild);
+
+              // It seems these two operators can be merged.
+              // Check that plan meets some preconditions before doing it.
+              // In particular, in the presence of map joins in the upstream plan:
+              // - we cannot exceed the noconditional task size, and
+              // - if we already merged the big table, we cannot merge the broadcast
+              // tables.
+              if (sr.retainableOps.isEmpty() || !validPreConditions(pctx, optimizerCache, sr)) {
+                // Skip
+                LOG.debug("{} and {} do not meet preconditions", retainableRsOp, discardableRsOp);
+                continue;
+              }
+
+              // We can merge
+              Operator<?> lastRetainableOp = sr.retainableOps.get(sr.retainableOps.size() - 1);
+              Operator<?> lastDiscardableOp = sr.discardableOps.get(sr.discardableOps.size() - 1);
+              if (lastDiscardableOp.getNumChild() != 0) {
+                List<Operator<? extends OperatorDesc>> allChildren =
+                        Lists.newArrayList(lastDiscardableOp.getChildOperators());
+                for (Operator<? extends OperatorDesc> op : allChildren) {
+                  lastDiscardableOp.getChildOperators().remove(op);
+                  op.replaceParent(lastDiscardableOp, lastRetainableOp);
+                  lastRetainableOp.getChildOperators().add(op);
+                }
+              }
+
+              LOG.debug("Merging subtree starting at {} into subtree starting at {}",
+                  discardableRsOp, retainableRsOp);
+
+              // First we remove the input operators of the expression that
+              // we are going to eliminate
+              for (Operator<?> op : sr.discardableInputOps) {
+                OperatorUtils.removeOperator(op);
+                optimizerCache.removeOp(op);
+                removedOps.add(op);
+                // Remove DPP predicates
+                if (op instanceof ReduceSinkOperator) {
+                  SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
+                  if (sjbi != null && !sr.discardableOps.contains(sjbi.getTsOp()) &&
+                          !sr.discardableInputOps.contains(sjbi.getTsOp())) {
+                    GenTezUtils.removeSemiJoinOperator(
+                            pctx, (ReduceSinkOperator) op, sjbi.getTsOp());
+                    optimizerCache.tableScanToDPPSource.remove(sjbi.getTsOp(), op);
+                  }
+                } else if (op instanceof AppMasterEventOperator) {
+                  DynamicPruningEventDesc dped = (DynamicPruningEventDesc) op.getConf();
+                  if (!sr.discardableOps.contains(dped.getTableScan()) &&
+                          !sr.discardableInputOps.contains(dped.getTableScan())) {
+                    GenTezUtils.removeSemiJoinOperator(
+                            pctx, (AppMasterEventOperator) op, dped.getTableScan());
+                    optimizerCache.tableScanToDPPSource.remove(dped.getTableScan(), op);
+                  }
+                }
+                LOG.debug("Input operator removed: {}", op);
+              }
+              // We remove the discardable RS operator
+              OperatorUtils.removeOperator(discardableRsOp);
+              optimizerCache.removeOp(discardableRsOp);
+              removedOps.add(discardableRsOp);
+              LOG.debug("Operator removed: {}", discardableRsOp);
+              // Then we merge the operators of the works we are going to merge
+              optimizerCache.removeOpAndCombineWork(discardableRsOpChild, retainableRsOpChild);
+              // Finally we remove the rest of the expression from the tree
+              for (Operator<?> op : sr.discardableOps) {
+                OperatorUtils.removeOperator(op);
+                optimizerCache.removeOp(op);
+                removedOps.add(op);
+                LOG.debug("Operator removed: {}", op);
+              }
+
+              break;
+            }
+
+            if (removedOps.contains(discardableRsOp)) {
+              // This operator has been removed, remove it from the list of existing operators
+              existingRsOps.remove(rsParent, discardableRsOp);
+            } else {
+              // This operator has not been removed, include it in the list of existing operators
+              existingRsOps.put(rsParent, discardableRsOp);
+            }
+          }
+        }
+
+        // We gather the operators that will be used for next iteration of extended optimization
+        // (if any)
+        parentToRsOps = ArrayListMultimap.create();
+        visited = new HashSet<>();
+        for (Entry<Operator<?>, ReduceSinkOperator> e : existingRsOps.entries()) {
+          if (removedOps.contains(e.getValue()) || e.getValue().getNumChild() < 1) {
+            // If 1) RS has been removed, or 2) it does not have a child (for instance, it is a
+            // semijoin RS), we can quickly skip this one
+            continue;
+          }
+          gatherReduceSinkOpsByInput(parentToRsOps,  visited,
+              findWorkOperators(optimizerCache, e.getValue().getChildOperators().get(0)));
+        }
+      }
+
+      // Remove unused table scan operators
+      it = topOps.entrySet().iterator();
+      while (it.hasNext()) {
+        Entry<String, TableScanOperator> e = it.next();
+        if (e.getValue().getNumChild() == 0) {
+          it.remove();
+        }
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("After SharedWorkExtendedOptimizer:\n"
+            + Operator.toString(pctx.getTopOps().values()));
+      }
+    }
+
+    // If we are running tests, we are going to verify that the contents of the cache
+    // correspond with the contents of the plan, and otherwise we fail.
+    // This check always run when we are running in test mode, independently on whether
+    // we use the basic or the extended version of the optimizer.
+    if (pctx.getConf().getBoolVar(ConfVars.HIVE_IN_TEST)) {
+      Set<Operator<?>> visited = new HashSet<>();
+      it = topOps.entrySet().iterator();
+      while (it.hasNext()) {
+        Entry<String, TableScanOperator> e = it.next();
+        for (Operator<?> op : OperatorUtils.findOperators(e.getValue(), Operator.class)) {
+          if (!visited.contains(op)) {
+            if (!findWorkOperators(optimizerCache, op).equals(
+                findWorkOperators(op, new HashSet<Operator<?>>()))) {
+              throw new SemanticException("Error in shared work optimizer: operator cache contents"
+                  + "and actual plan differ");
+            }
+            visited.add(op);
+          }
+        }
+      }
+    }
+
     return pctx;
   }
 
@@ -392,16 +595,59 @@ public class SharedWorkOptimizer extends Transform {
       }
     }
     List<Entry<String, Long>> sortedTables =
-            new LinkedList<>(tableToTotalSize.entrySet());
+        new LinkedList<>(tableToTotalSize.entrySet());
     Collections.sort(sortedTables, Collections.reverseOrder(
-            new Comparator<Map.Entry<String, Long>>() {
-              public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
-                return (o1.getValue()).compareTo(o2.getValue());
-              }
-            }));
+        new Comparator<Map.Entry<String, Long>>() {
+          public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
+            return (o1.getValue()).compareTo(o2.getValue());
+          }
+        }));
     return sortedTables;
   }
 
+  private static void gatherReduceSinkOpsByInput(Multimap<Operator<?>,
+      ReduceSinkOperator> parentToRsOps, Set<Operator<?>> visited, Set<Operator<?>> ops) {
+    for (Operator<?> op : ops) {
+      // If the RS has other RS siblings, we will add it to be considered in next iteration
+      if (op instanceof ReduceSinkOperator && !visited.contains(op)) {
+        Operator<?> parent = op.getParentOperators().get(0);
+        Set<ReduceSinkOperator> s = new LinkedHashSet<>();
+        for (Operator<?> c : parent.getChildOperators()) {
+          if (c instanceof ReduceSinkOperator) {
+            s.add((ReduceSinkOperator) c);
+            visited.add(c);
+          }
+        }
+        if (s.size() > 1) {
+          parentToRsOps.putAll(parent, s);
+        }
+      }
+    }
+  }
+
+  private static List<Entry<Operator<?>, Long>> rankOpsByAccumulatedSize(Set<Operator<?>> opsSet) {
+    Map<Operator<?>, Long> opToTotalSize = new HashMap<>();
+    for (Operator<?> op : opsSet) {
+      long size = op.getStatistics() != null ?
+          op.getStatistics().getDataSize() : 0L;
+      opToTotalSize.put(op,
+          StatsUtils.safeMult(op.getChildOperators().size(), size));
+    }
+    List<Entry<Operator<?>, Long>> sortedOps =
+        new LinkedList<>(opToTotalSize.entrySet());
+    Collections.sort(sortedOps, Collections.reverseOrder(
+        new Comparator<Map.Entry<Operator<?>, Long>>() {
+          public int compare(Map.Entry<Operator<?>, Long> o1, Map.Entry<Operator<?>, Long> o2) {
+            int valCmp = o1.getValue().compareTo(o2.getValue());
+            if (valCmp == 0) {
+              return o1.getKey().toString().compareTo(o2.getKey().toString());
+            }
+            return valCmp;
+          }
+        }));
+    return sortedOps;
+  }
+
   private static boolean areMergeable(ParseContext pctx, SharedWorkOptimizerCache optimizerCache,
           TableScanOperator tsOp1, TableScanOperator tsOp2) throws SemanticException {
     // First we check if the two table scan operators can actually be merged
@@ -486,15 +732,15 @@ public class SharedWorkOptimizer extends Transform {
     return true;
   }
 
-  private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
+  private static SharedResult extractSharedOptimizationInfoForRoot(ParseContext pctx,
           SharedWorkOptimizerCache optimizerCache,
           TableScanOperator retainableTsOp,
           TableScanOperator discardableTsOp) throws SemanticException {
-    Set<Operator<?>> retainableOps = new LinkedHashSet<>();
-    Set<Operator<?>> discardableOps = new LinkedHashSet<>();
+    LinkedHashSet<Operator<?>> retainableOps = new LinkedHashSet<>();
+    LinkedHashSet<Operator<?>> discardableOps = new LinkedHashSet<>();
     Set<Operator<?>> discardableInputOps = new HashSet<>();
-    long dataSize = 0l;
-    long maxDataSize = 0l;
+    long dataSize = 0L;
+    long maxDataSize = 0L;
 
     retainableOps.add(retainableTsOp);
     discardableOps.add(discardableTsOp);
@@ -503,7 +749,8 @@ public class SharedWorkOptimizer extends Transform {
     if (equalOp1.getNumChild() > 1 || equalOp2.getNumChild() > 1) {
       // TODO: Support checking multiple child operators to merge further.
       discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps));
-      return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize);
+      return new SharedResult(retainableOps, discardableOps, discardableInputOps,
+          dataSize, maxDataSize);
     }
     Operator<?> currentOp1 = retainableTsOp.getChildOperators().get(0);
     Operator<?> currentOp2 = discardableTsOp.getChildOperators().get(0);
@@ -532,19 +779,54 @@ public class SharedWorkOptimizer extends Transform {
                 currentOp2.getChildOperators().size() > 1) {
           // TODO: Support checking multiple child operators to merge further.
           discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps));
-          discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps));
-          return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize);
+          discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps,
+              discardableInputOps));
+          return new SharedResult(retainableOps, discardableOps, discardableInputOps,
+              dataSize, maxDataSize);
         }
         currentOp1 = currentOp1.getChildOperators().get(0);
         currentOp2 = currentOp2.getChildOperators().get(0);
       } else {
         // Bail out
         discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps));
-        discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps));
-        return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize);
+        discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps,
+            discardableInputOps));
+        return new SharedResult(retainableOps, discardableOps, discardableInputOps,
+            dataSize, maxDataSize);
       }
     }
 
+    return extractSharedOptimizationInfo(pctx, optimizerCache, equalOp1, equalOp2,
+        currentOp1, currentOp2, retainableOps, discardableOps, discardableInputOps, false);
+  }
+
+  private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
+      SharedWorkOptimizerCache optimizerCache,
+      Operator<?> retainableOpEqualParent,
+      Operator<?> discardableOpEqualParent,
+      Operator<?> retainableOp,
+      Operator<?> discardableOp) throws SemanticException {
+    return extractSharedOptimizationInfo(pctx, optimizerCache,
+        retainableOpEqualParent, discardableOpEqualParent, retainableOp, discardableOp,
+        new LinkedHashSet<>(), new LinkedHashSet<>(), new HashSet<>(), true);
+  }
+
+  private static SharedResult extractSharedOptimizationInfo(ParseContext pctx,
+      SharedWorkOptimizerCache optimizerCache,
+      Operator<?> retainableOpEqualParent,
+      Operator<?> discardableOpEqualParent,
+      Operator<?> retainableOp,
+      Operator<?> discardableOp,
+      LinkedHashSet<Operator<?>> retainableOps,
+      LinkedHashSet<Operator<?>> discardableOps,
+      Set<Operator<?>> discardableInputOps,
+      boolean removeInputBranch) throws SemanticException {
+    Operator<?> equalOp1 = retainableOpEqualParent;
+    Operator<?> equalOp2 = discardableOpEqualParent;
+    Operator<?> currentOp1 = retainableOp;
+    Operator<?> currentOp2 = discardableOp;
+    long dataSize = 0L;
+    long maxDataSize = 0L;
     // Try to merge rest of operators
     while (!(currentOp1 instanceof ReduceSinkOperator)) {
       // Check whether current operators are equal
@@ -563,7 +845,7 @@ public class SharedWorkOptimizer extends Transform {
         for (; idx < currentOp1.getParentOperators().size(); idx++) {
           Operator<?> parentOp1 = currentOp1.getParentOperators().get(idx);
           Operator<?> parentOp2 = currentOp2.getParentOperators().get(idx);
-          if (parentOp1 == equalOp1 && parentOp2 == equalOp2) {
+          if (parentOp1 == equalOp1 && parentOp2 == equalOp2 && !removeInputBranch) {
             continue;
           }
           if ((parentOp1 == equalOp1 && parentOp2 != equalOp2) ||
@@ -572,7 +854,8 @@ public class SharedWorkOptimizer extends Transform {
             break;
           }
           // Compare input
-          List<Operator<?>> removeOpsForCurrentInput = compareAndGatherOps(pctx, parentOp1, parentOp2);
+          List<Operator<?>> removeOpsForCurrentInput =
+              compareAndGatherOps(pctx, parentOp1, parentOp2);
           if (removeOpsForCurrentInput == null) {
             // Inputs are not the same, bail out
             break;
@@ -626,8 +909,10 @@ public class SharedWorkOptimizer extends Transform {
 
     discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableInputOps));
     discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, discardableOps));
-    discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps, discardableInputOps));
-    return new SharedResult(retainableOps, discardableOps, discardableInputOps, dataSize, maxDataSize);
+    discardableInputOps.addAll(gatherDPPBranchOps(pctx, optimizerCache, retainableOps,
+        discardableInputOps));
+    return new SharedResult(retainableOps, discardableOps, discardableInputOps,
+        dataSize, maxDataSize);
   }
 
   private static Multiset<String> extractConjsIgnoringDPPPreds(ExprNodeDesc predicate) {
@@ -656,7 +941,8 @@ public class SharedWorkOptimizer extends Transform {
     Set<Operator<?>> dppBranches = new HashSet<>();
     for (Operator<?> op : ops) {
       if (op instanceof TableScanOperator) {
-        Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource.get((TableScanOperator) op);
+        Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource
+            .get((TableScanOperator) op);
         for (Operator<?> dppSource : c) {
           // Remove the branches
           Operator<?> currentOp = dppSource;
@@ -676,7 +962,8 @@ public class SharedWorkOptimizer extends Transform {
     Set<Operator<?>> dppBranches = new HashSet<>();
     for (Operator<?> op : ops) {
       if (op instanceof TableScanOperator) {
-        Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource.get((TableScanOperator) op);
+        Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource
+            .get((TableScanOperator) op);
         for (Operator<?> dppSource : c) {
           Set<Operator<?>> ascendants =
               findAscendantWorkOperators(pctx, optimizerCache, dppSource);
@@ -711,6 +998,11 @@ public class SharedWorkOptimizer extends Transform {
       return false;
     }
 
+    if (gather && op2.getChildOperators().size() > 1) {
+      // If the second operator has more than one child, we stop gathering
+      gather = false;
+    }
+
     if (gather) {
       result.add(op2);
     }
@@ -724,12 +1016,8 @@ public class SharedWorkOptimizer extends Transform {
       for (int i = 0; i < op1ParentOperators.size(); i++) {
         Operator<?> op1ParentOp = op1ParentOperators.get(i);
         Operator<?> op2ParentOp = op2ParentOperators.get(i);
-        boolean mergeable;
-        if (gather && op2ParentOp.getChildOperators().size() < 2) {
-          mergeable = compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, true);
-        } else {
-          mergeable = compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, false);
-        }
+        boolean mergeable =
+            compareAndGatherOps(pctx, op1ParentOp, op2ParentOp, result, gather);
         if (!mergeable) {
           return false;
         }
@@ -741,7 +1029,6 @@ public class SharedWorkOptimizer extends Transform {
     return true;
   }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
   private static boolean compareOperator(ParseContext pctx, Operator<?> op1, Operator<?> op2)
           throws SemanticException {
     if (!op1.getClass().getName().equals(op2.getClass().getName())) {
@@ -809,21 +1096,21 @@ public class SharedWorkOptimizer extends Transform {
       return false;
     }
 
-    TableScanOperator tsOp1 = (TableScanOperator) sr.retainableOps.get(0);
-    TableScanOperator tsOp2 = (TableScanOperator) sr.discardableOps.get(0);
+    Operator<?> op1 = sr.retainableOps.get(0);
+    Operator<?> op2 = sr.discardableOps.get(0);
 
-    // 1) The set of operators in the works of the TS operators need to meet
+    // 1) The set of operators in the works that we are merging need to meet
     // some requirements. In particular:
-    // 1.1. None of the works that contain the TS operators can contain a Union
+    // 1.1. None of the works that we are merging can contain a Union
     // operator. This is not supported yet as we might end up with cycles in
     // the Tez DAG.
     // 1.2. There cannot be more than one DummyStore operator in the new resulting
-    // work when the TS operators are merged. This is due to an assumption in
+    // work when the operators are merged. This is due to an assumption in
     // MergeJoinProc that needs to be further explored.
     // If any of these conditions are not met, we cannot merge.
     // TODO: Extend rule so it can be applied for these cases.
-    final Set<Operator<?>> workOps1 = findWorkOperators(optimizerCache, tsOp1);
-    final Set<Operator<?>> workOps2 = findWorkOperators(optimizerCache, tsOp2);
+    final Set<Operator<?>> workOps1 = findWorkOperators(optimizerCache, op1);
+    final Set<Operator<?>> workOps2 = findWorkOperators(optimizerCache, op2);
     boolean foundDummyStoreOp = false;
     for (Operator<?> op : workOps1) {
       if (op instanceof UnionOperator) {
@@ -853,8 +1140,8 @@ public class SharedWorkOptimizer extends Transform {
     // If we do, we cannot merge. The reason is that Tez currently does
     // not support parallel edges, i.e., multiple edges from same work x
     // into same work y.
-    final Set<Operator<?>> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, tsOp1);
-    final Set<Operator<?>> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, tsOp2);
+    final Set<Operator<?>> outputWorksOps1 = findChildWorkOperators(pctx, optimizerCache, op1);
+    final Set<Operator<?>> outputWorksOps2 = findChildWorkOperators(pctx, optimizerCache, op2);
     if (!Collections.disjoint(outputWorksOps1, outputWorksOps2)) {
       // We cannot merge
       return false;
@@ -866,10 +1153,19 @@ public class SharedWorkOptimizer extends Transform {
     //   Work2   Work3                                 Work2
     //
     // If we do, we cannot merge. The reason is the same as above, currently
-    // Tez currently does not support parallel edges.
-    final Set<Operator<?>> inputWorksOps1 = findParentWorkOperators(pctx, optimizerCache, tsOp1);
+    // Tez does not support parallel edges.
+    //
+    // In the check, we exclude the inputs to the root operator that we are trying
+    // to merge (only useful for extended merging as TS do not have inputs).
+    final Set<Operator<?>> excludeOps1 = sr.retainableOps.get(0).getNumParent() > 0 ?
+        ImmutableSet.copyOf(sr.retainableOps.get(0).getParentOperators()) : ImmutableSet.of();
+    final Set<Operator<?>> inputWorksOps1 =
+        findParentWorkOperators(pctx, optimizerCache, op1, excludeOps1);
+    final Set<Operator<?>> excludeOps2 = sr.discardableOps.get(0).getNumParent() > 0 ?
+        Sets.union(ImmutableSet.copyOf(sr.discardableOps.get(0).getParentOperators()), sr.discardableInputOps) :
+            sr.discardableInputOps;
     final Set<Operator<?>> inputWorksOps2 =
-            findParentWorkOperators(pctx, optimizerCache, tsOp2, sr.discardableInputOps);
+        findParentWorkOperators(pctx, optimizerCache, op2, excludeOps2);
     if (!Collections.disjoint(inputWorksOps1, inputWorksOps2)) {
       // We cannot merge
       return false;
@@ -885,9 +1181,9 @@ public class SharedWorkOptimizer extends Transform {
     //
     // If we do, we cannot merge, as we would end up with a cycle in the DAG.
     final Set<Operator<?>> descendantWorksOps1 =
-            findDescendantWorkOperators(pctx, optimizerCache, tsOp1, sr.discardableInputOps);
+            findDescendantWorkOperators(pctx, optimizerCache, op1, sr.discardableInputOps);
     final Set<Operator<?>> descendantWorksOps2 =
-            findDescendantWorkOperators(pctx, optimizerCache, tsOp2, sr.discardableInputOps);
+            findDescendantWorkOperators(pctx, optimizerCache, op2, sr.discardableInputOps);
     if (!Collections.disjoint(descendantWorksOps1, workOps2)
             || !Collections.disjoint(workOps1, descendantWorksOps2)) {
       return false;
@@ -1120,6 +1416,12 @@ public class SharedWorkOptimizer extends Transform {
       this.dataSize = dataSize;
       this.maxDataSize = maxDataSize;
     }
+
+    @Override
+    public String toString() {
+      return "SharedResult { " + this.retainableOps + "; " + this.discardableOps + "; "
+          + this.discardableInputOps + "};";
+    }
   }
 
   /** Cache to accelerate optimization */
@@ -1173,6 +1475,11 @@ public class SharedWorkOptimizer extends Transform {
         }
       }
     }
+
+    @Override
+    public String toString() {
+      return "SharedWorkOptimizerCache { \n" + operatorToWorkOperators.toString() + "\n };";
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/queries/clientpositive/sharedworkext.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/sharedworkext.q b/ql/src/test/queries/clientpositive/sharedworkext.q
new file mode 100644
index 0000000..b1801ea
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/sharedworkext.q
@@ -0,0 +1,53 @@
+EXPLAIN
+SELECT a.key FROM
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key;
+
+SELECT a.key FROM
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key;
+
+EXPLAIN
+SELECT a.key FROM
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key;
+
+SELECT a.key FROM
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key;

http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
index 8ab5e3a..2fb1854 100644
--- a/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
+++ b/ql/src/test/results/clientpositive/llap/explainuser_1.q.out
@@ -2363,10 +2363,9 @@ Plan optimized by CBO.
 
 Vertex dependency in root stage
 Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE), Reducer 6 (CUSTOM_SIMPLE_EDGE)
-Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
 Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
 Reducer 6 <- Map 5 (CUSTOM_SIMPLE_EDGE)
-Reducer 7 <- Map 5 (CUSTOM_SIMPLE_EDGE)
 
 Stage-0
   Fetch Operator
@@ -2384,41 +2383,37 @@ Stage-0
                 predicate:((_col2 = 0) or (_col5 is null and _col1 is not null and (_col3 >= _col2)))
                 Merge Join Operator [MERGEJOIN_37] (rows=26 width=141)
                   Conds:RS_24.UDFToDouble(_col1)=RS_25._col0(Left Outer),Output:["_col0","_col1","_col2","_col3","_col5"]
+                <-Reducer 6 [SIMPLE_EDGE] llap
+                  SHUFFLE [RS_25]
+                    PartitionCols:_col0
+                    Select Operator [SEL_20] (rows=1 width=12)
+                      Output:["_col0","_col1"]
+                      Group By Operator [GBY_7] (rows=1 width=8)
+                        Output:["_col0"],aggregations:["avg(VALUE._col0)"]
+                      <-Map 5 [CUSTOM_SIMPLE_EDGE] llap
+                        PARTITION_ONLY_SHUFFLE [RS_6]
+                          Group By Operator [GBY_5] (rows=1 width=76)
+                            Output:["_col0"],aggregations:["avg(p_size)"]
+                            Filter Operator [FIL_33] (rows=8 width=4)
+                              predicate:(p_size < 10)
+                              TableScan [TS_2] (rows=26 width=4)
+                                default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_size"]
                 <-Reducer 2 [SIMPLE_EDGE] llap
                   SHUFFLE [RS_24]
                     PartitionCols:UDFToDouble(_col1)
                     Merge Join Operator [MERGEJOIN_36] (rows=26 width=141)
                       Conds:(Inner),Output:["_col0","_col1","_col2","_col3"]
+                    <-Reducer 6 [CUSTOM_SIMPLE_EDGE] llap
+                      SHUFFLE [RS_22]
+                        Group By Operator [GBY_12] (rows=1 width=16)
+                          Output:["_col0","_col1"],aggregations:["count()","count(_col0)"]
+                           Please refer to the previous Group By Operator [GBY_7]
                     <-Map 1 [CUSTOM_SIMPLE_EDGE] llap
                       PARTITION_ONLY_SHUFFLE [RS_21]
                         Select Operator [SEL_1] (rows=26 width=125)
                           Output:["_col0","_col1"]
                           TableScan [TS_0] (rows=26 width=125)
                             default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_name","p_size"]
-                    <-Reducer 6 [CUSTOM_SIMPLE_EDGE] llap
-                      PARTITION_ONLY_SHUFFLE [RS_22]
-                        Group By Operator [GBY_12] (rows=1 width=16)
-                          Output:["_col0","_col1"],aggregations:["count()","count(_col0)"]
-                          Group By Operator [GBY_7] (rows=1 width=8)
-                            Output:["_col0"],aggregations:["avg(VALUE._col0)"]
-                          <-Map 5 [CUSTOM_SIMPLE_EDGE] llap
-                            PARTITION_ONLY_SHUFFLE [RS_6]
-                              Group By Operator [GBY_5] (rows=1 width=76)
-                                Output:["_col0"],aggregations:["avg(p_size)"]
-                                Filter Operator [FIL_33] (rows=8 width=4)
-                                  predicate:(p_size < 10)
-                                  TableScan [TS_2] (rows=26 width=4)
-                                    default@part,part,Tbl:COMPLETE,Col:COMPLETE,Output:["p_size"]
-                <-Reducer 7 [SIMPLE_EDGE] llap
-                  SHUFFLE [RS_25]
-                    PartitionCols:_col0
-                    Select Operator [SEL_20] (rows=1 width=12)
-                      Output:["_col0","_col1"]
-                      Group By Operator [GBY_19] (rows=1 width=8)
-                        Output:["_col0"],aggregations:["avg(VALUE._col0)"]
-                      <-Map 5 [CUSTOM_SIMPLE_EDGE] llap
-                        PARTITION_ONLY_SHUFFLE [RS_18]
-                           Please refer to the previous Group By Operator [GBY_5]
 
 PREHOOK: query: explain select b.p_mfgr, min(p_retailprice) 
 from part b 

http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/sharedworkext.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/sharedworkext.q.out b/ql/src/test/results/clientpositive/llap/sharedworkext.q.out
new file mode 100644
index 0000000..e56b1ce
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/sharedworkext.q.out
@@ -0,0 +1,1076 @@
+PREHOOK: query: EXPLAIN
+SELECT a.key FROM
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT a.key FROM
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 5 (SIMPLE_EDGE)
+        Reducer 5 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a2
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: string)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  alias: a1
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col1, _col2
+                Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+                Group By Operator
+                  keys: _col2 (type: string), _col1 (type: string)
+                  mode: hash
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string), _col1 (type: string)
+                    sort order: ++
+                    Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                    Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string), _col1 (type: string)
+                    sort order: ++
+                    Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                    Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: string), KEY._col1 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col0 (type: string)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Map-reduce partition columns: _col0 (type: string)
+                    Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col0
+                Statistics: Num rows: 528 Data size: 45936 Basic stats: COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 528 Data size: 45936 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: string), KEY._col1 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col0 (type: string)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Map-reduce partition columns: _col0 (type: string)
+                    Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT a.key FROM
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT a.key FROM
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) a
+JOIN
+(
+  SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+  GROUP BY a1.key, a2.value
+) b
+ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+0
+10
+100
+103
+104
+105
+11
+111
+113
+114
+116
+118
+119
+12
+120
+125
+126
+128
+129
+131
+133
+134
+136
+137
+138
+143
+145
+146
+149
+15
+150
+152
+153
+155
+156
+157
+158
+160
+162
+163
+164
+165
+166
+167
+168
+169
+17
+170
+172
+174
+175
+176
+177
+178
+179
+18
+180
+181
+183
+186
+187
+189
+19
+190
+191
+192
+193
+194
+195
+196
+197
+199
+2
+20
+200
+201
+202
+203
+205
+207
+208
+209
+213
+214
+216
+217
+218
+219
+221
+222
+223
+224
+226
+228
+229
+230
+233
+235
+237
+238
+239
+24
+241
+242
+244
+247
+248
+249
+252
+255
+256
+257
+258
+26
+260
+262
+263
+265
+266
+27
+272
+273
+274
+275
+277
+278
+28
+280
+281
+282
+283
+284
+285
+286
+287
+288
+289
+291
+292
+296
+298
+30
+302
+305
+306
+307
+308
+309
+310
+311
+315
+316
+317
+318
+321
+322
+323
+325
+327
+33
+331
+332
+333
+335
+336
+338
+339
+34
+341
+342
+344
+345
+348
+35
+351
+353
+356
+360
+362
+364
+365
+366
+367
+368
+369
+37
+373
+374
+375
+377
+378
+379
+382
+384
+386
+389
+392
+393
+394
+395
+396
+397
+399
+4
+400
+401
+402
+403
+404
+406
+407
+409
+41
+411
+413
+414
+417
+418
+419
+42
+421
+424
+427
+429
+43
+430
+431
+432
+435
+436
+437
+438
+439
+44
+443
+444
+446
+448
+449
+452
+453
+454
+455
+457
+458
+459
+460
+462
+463
+466
+467
+468
+469
+47
+470
+472
+475
+477
+478
+479
+480
+481
+482
+483
+484
+485
+487
+489
+490
+491
+492
+493
+494
+495
+496
+497
+498
+5
+51
+53
+54
+57
+58
+64
+65
+66
+67
+69
+70
+72
+74
+76
+77
+78
+8
+80
+82
+83
+84
+85
+86
+87
+9
+90
+92
+95
+96
+97
+98
+PREHOOK: query: EXPLAIN
+SELECT a.key FROM
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+SELECT a.key FROM
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Edges:
+        Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 7 (SIMPLE_EDGE)
+        Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE)
+        Reducer 5 <- Reducer 4 (SIMPLE_EDGE), Reducer 6 (SIMPLE_EDGE)
+        Reducer 6 <- Reducer 3 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: a2
+                  Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 89000 Basic stats: COMPLETE Column stats: COMPLETE
+                        value expressions: _col1 (type: string)
+            Execution mode: llap
+            LLAP IO: no inputs
+        Map 7 
+            Map Operator Tree:
+                TableScan
+                  alias: a1
+                  Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 43500 Basic stats: COMPLETE Column stats: COMPLETE
+            Execution mode: llap
+            LLAP IO: no inputs
+        Reducer 2 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: string)
+                  1 _col0 (type: string)
+                outputColumnNames: _col1, _col2
+                Statistics: Num rows: 809 Data size: 144002 Basic stats: COMPLETE Column stats: COMPLETE
+                Group By Operator
+                  keys: _col1 (type: string), _col2 (type: string)
+                  mode: hash
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string), _col1 (type: string)
+                    sort order: ++
+                    Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                    Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+        Reducer 3 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Group By Operator
+                keys: KEY._col0 (type: string), KEY._col1 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                Select Operator
+                  expressions: _col1 (type: string)
+                  outputColumnNames: _col0
+                  Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: 0 (type: int), _col0 (type: string)
+                    sort order: ++
+                    Map-reduce partition columns: 0 (type: int)
+                    Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: 0 (type: int), _col0 (type: string)
+                    sort order: ++
+                    Map-reduce partition columns: 0 (type: int)
+                    Statistics: Num rows: 404 Data size: 71912 Basic stats: COMPLETE Column stats: COMPLETE
+        Reducer 4 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey1 (type: string)
+                outputColumnNames: _col0
+                Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+                PTF Operator
+                  Function definitions:
+                      Input definition
+                        input alias: ptf_0
+                        output shape: _col0: string
+                        type: WINDOWING
+                      Windowing table definition
+                        input alias: ptf_1
+                        name: windowingtablefunction
+                        order by: _col0 ASC NULLS FIRST
+                        partition by: 0
+                        raw input shape:
+                        window functions:
+                            window function definition
+                              alias: rank_window_0
+                              arguments: _col0
+                              name: rank
+                              window function: GenericUDAFRankEvaluator
+                              window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
+                              isPivotResult: true
+                  Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: rank_window_0 is not null (type: boolean)
+                    Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: rank_window_0 (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+        Reducer 5 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Merge Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                keys:
+                  0 _col0 (type: int)
+                  1 _col0 (type: int)
+                outputColumnNames: _col0
+                Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 6 
+            Execution mode: llap
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey1 (type: string)
+                outputColumnNames: _col0
+                Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+                PTF Operator
+                  Function definitions:
+                      Input definition
+                        input alias: ptf_0
+                        output shape: _col0: string
+                        type: WINDOWING
+                      Windowing table definition
+                        input alias: ptf_1
+                        name: windowingtablefunction
+                        order by: _col0 ASC NULLS FIRST
+                        partition by: 0
+                        raw input shape:
+                        window functions:
+                            window function definition
+                              alias: rank_window_0
+                              arguments: _col0
+                              name: rank
+                              window function: GenericUDAFRankEvaluator
+                              window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX)
+                              isPivotResult: true
+                  Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+                  Filter Operator
+                    predicate: rank_window_0 is not null (type: boolean)
+                    Statistics: Num rows: 404 Data size: 35148 Basic stats: COMPLETE Column stats: COMPLETE
+                    Select Operator
+                      expressions: rank_window_0 (type: int)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: int)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: int)
+                        Statistics: Num rows: 404 Data size: 1616 Basic stats: COMPLETE Column stats: COMPLETE
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT a.key FROM
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT a.key FROM
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) a
+) a
+JOIN
+(
+  SELECT rank() OVER (ORDER BY key) AS key FROM
+    (SELECT a1.key, a2.value FROM src a1 JOIN src a2 ON (a1.key = a2.key)
+    GROUP BY a1.key, a2.value) b
+) b
+ON a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
+30
+31
+32
+33
+34
+35
+36
+37
+38
+39
+40
+41
+42
+43
+44
+45
+46
+47
+48
+49
+50
+51
+52
+53
+54
+55
+56
+57
+58
+59
+60
+61
+62
+63
+64
+65
+66
+67
+68
+69
+70
+71
+72
+73
+74
+75
+76
+77
+78
+79
+80
+81
+82
+83
+84
+85
+86
+87
+88
+89
+90
+91
+92
+93
+94
+95
+96
+97
+98
+99
+100
+101
+102
+103
+104
+105
+106
+107
+108
+109
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149
+150
+151
+152
+153
+154
+155
+156
+157
+158
+159
+160
+161
+162
+163
+164
+165
+166
+167
+168
+169
+170
+171
+172
+173
+174
+175
+176
+177
+178
+179
+180
+181
+182
+183
+184
+185
+186
+187
+188
+189
+190
+191
+192
+193
+194
+195
+196
+197
+198
+199
+200
+201
+202
+203
+204
+205
+206
+207
+208
+209
+210
+211
+212
+213
+214
+215
+216
+217
+218
+219
+220
+221
+222
+223
+224
+225
+226
+227
+228
+229
+230
+231
+232
+233
+234
+235
+236
+237
+238
+239
+240
+241
+242
+243
+244
+245
+246
+247
+248
+249
+250
+251
+252
+253
+254
+255
+256
+257
+258
+259
+260
+261
+262
+263
+264
+265
+266
+267
+268
+269
+270
+271
+272
+273
+274
+275
+276
+277
+278
+279
+280
+281
+282
+283
+284
+285
+286
+287
+288
+289
+290
+291
+292
+293
+294
+295
+296
+297
+298
+299
+300
+301
+302
+303
+304
+305
+306
+307
+308
+309

http://git-wip-us.apache.org/repos/asf/hive/blob/96a409e1/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/subquery_multi.q.out b/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
index 975fd13..45f698b 100644
--- a/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
+++ b/ql/src/test/results/clientpositive/llap/subquery_multi.q.out
@@ -1663,13 +1663,12 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 10 <- Reducer 9 (SIMPLE_EDGE)
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
-        Reducer 4 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
-        Reducer 7 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
+        Reducer 7 <- Map 10 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
         Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
-        Reducer 9 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+        Reducer 9 <- Reducer 7 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -1692,7 +1691,7 @@ STAGE PLANS:
                         value expressions: _col0 (type: int), _col2 (type: string), _col3 (type: string), _col5 (type: int), _col6 (type: string), _col7 (type: double), _col8 (type: string)
             Execution mode: llap
             LLAP IO: no inputs
-        Map 11 
+        Map 10 
             Map Operator Tree:
                 TableScan
                   alias: pp
@@ -1714,11 +1713,6 @@ STAGE PLANS:
                           sort order: ++
                           Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                           Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE
-                        Reduce Output Operator
-                          key expressions: _col0 (type: string), _col1 (type: string)
-                          sort order: ++
-                          Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                          Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE
             Execution mode: llap
             LLAP IO: no inputs
         Map 5 
@@ -1763,35 +1757,8 @@ STAGE PLANS:
                         Map-reduce partition columns: _col1 (type: string), _col0 (type: string)
                         Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col2 (type: string)
-                      Reduce Output Operator
-                        key expressions: _col1 (type: string), _col0 (type: string)
-                        sort order: ++
-                        Map-reduce partition columns: _col1 (type: string), _col0 (type: string)
-                        Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col2 (type: string)
             Execution mode: llap
             LLAP IO: no inputs
-        Reducer 10 
-            Execution mode: llap
-            Reduce Operator Tree:
-              Group By Operator
-                keys: KEY._col0 (type: string), KEY._col1 (type: string)
-                mode: mergepartial
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
-                Filter Operator
-                  predicate: _col0 is not null (type: boolean)
-                  Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
-                  Select Operator
-                    expressions: _col0 (type: string), _col1 (type: string), true (type: boolean)
-                    outputColumnNames: _col0, _col1, _col2
-                    Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string), _col1 (type: string)
-                      sort order: ++
-                      Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                      Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
-                      value expressions: _col2 (type: boolean)
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -1874,6 +1841,16 @@ STAGE PLANS:
                     Map-reduce partition columns: _col0 (type: string)
                     Statistics: Num rows: 7 Data size: 840 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col1 (type: bigint), _col2 (type: bigint)
+                Group By Operator
+                  keys: _col2 (type: string), _col1 (type: string)
+                  mode: hash
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string), _col1 (type: string)
+                    sort order: ++
+                    Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                    Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
         Reducer 8 
             Execution mode: llap
             Reduce Operator Tree:
@@ -1892,24 +1869,24 @@ STAGE PLANS:
         Reducer 9 
             Execution mode: llap
             Reduce Operator Tree:
-              Merge Join Operator
-                condition map:
-                     Left Semi Join 0 to 1
-                keys:
-                  0 _col1 (type: string), _col0 (type: string)
-                  1 _col0 (type: string), _col1 (type: string)
-                outputColumnNames: _col1, _col2
-                Statistics: Num rows: 14 Data size: 2744 Basic stats: COMPLETE Column stats: COMPLETE
-                Group By Operator
-                  keys: _col2 (type: string), _col1 (type: string)
-                  mode: hash
-                  outputColumnNames: _col0, _col1
+              Group By Operator
+                keys: KEY._col0 (type: string), KEY._col1 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+                Filter Operator
+                  predicate: _col0 is not null (type: boolean)
                   Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
-                  Reduce Output Operator
-                    key expressions: _col0 (type: string), _col1 (type: string)
-                    sort order: ++
-                    Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                    Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: _col0 (type: string), _col1 (type: string), true (type: boolean)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: string), _col1 (type: string)
+                      sort order: ++
+                      Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                      Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+                      value expressions: _col2 (type: boolean)
 
   Stage: Stage-0
     Fetch Operator
@@ -2143,13 +2120,12 @@ STAGE PLANS:
     Tez
 #### A masked pattern was here ####
       Edges:
-        Reducer 10 <- Reducer 9 (SIMPLE_EDGE)
         Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 5 (SIMPLE_EDGE)
         Reducer 3 <- Reducer 2 (SIMPLE_EDGE), Reducer 8 (ONE_TO_ONE_EDGE)
-        Reducer 4 <- Reducer 10 (ONE_TO_ONE_EDGE), Reducer 3 (SIMPLE_EDGE)
-        Reducer 7 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+        Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 9 (ONE_TO_ONE_EDGE)
+        Reducer 7 <- Map 10 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
         Reducer 8 <- Reducer 7 (SIMPLE_EDGE)
-        Reducer 9 <- Map 11 (SIMPLE_EDGE), Map 6 (SIMPLE_EDGE)
+        Reducer 9 <- Reducer 7 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -2172,7 +2148,7 @@ STAGE PLANS:
                         value expressions: _col0 (type: int), _col2 (type: string), _col3 (type: string), _col5 (type: int), _col7 (type: double), _col8 (type: string)
             Execution mode: llap
             LLAP IO: no inputs
-        Map 11 
+        Map 10 
             Map Operator Tree:
                 TableScan
                   alias: pp
@@ -2194,11 +2170,6 @@ STAGE PLANS:
                           sort order: ++
                           Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                           Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE
-                        Reduce Output Operator
-                          key expressions: _col0 (type: string), _col1 (type: string)
-                          sort order: ++
-                          Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                          Statistics: Num rows: 13 Data size: 2548 Basic stats: COMPLETE Column stats: COMPLETE
             Execution mode: llap
             LLAP IO: no inputs
         Map 5 
@@ -2243,35 +2214,8 @@ STAGE PLANS:
                         Map-reduce partition columns: _col1 (type: string), _col0 (type: string)
                         Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col2 (type: string)
-                      Reduce Output Operator
-                        key expressions: _col1 (type: string), _col0 (type: string)
-                        sort order: ++
-                        Map-reduce partition columns: _col1 (type: string), _col0 (type: string)
-                        Statistics: Num rows: 26 Data size: 7488 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col2 (type: string)
             Execution mode: llap
             LLAP IO: no inputs
-        Reducer 10 
-            Execution mode: llap
-            Reduce Operator Tree:
-              Group By Operator
-                keys: KEY._col0 (type: string), KEY._col1 (type: string)
-                mode: mergepartial
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
-                Filter Operator
-                  predicate: _col0 is not null (type: boolean)
-                  Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
-                  Select Operator
-                    expressions: _col0 (type: string), _col1 (type: string), true (type: boolean)
-                    outputColumnNames: _col0, _col1, _col2
-                    Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string), _col1 (type: string)
-                      sort order: ++
-                      Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                      Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
-                      value expressions: _col2 (type: boolean)
         Reducer 2 
             Execution mode: llap
             Reduce Operator Tree:
@@ -2354,6 +2298,16 @@ STAGE PLANS:
                     Map-reduce partition columns: _col0 (type: string)
                     Statistics: Num rows: 7 Data size: 840 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col1 (type: bigint), _col2 (type: bigint)
+                Group By Operator
+                  keys: _col2 (type: string), _col1 (type: string)
+                  mode: hash
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string), _col1 (type: string)
+                    sort order: ++
+                    Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                    Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
         Reducer 8 
             Execution mode: llap
             Reduce Operator Tree:
@@ -2372,24 +2326,24 @@ STAGE PLANS:
         Reducer 9 
             Execution mode: llap
             Reduce Operator Tree:
-              Merge Join Operator
-                condition map:
-                     Left Semi Join 0 to 1
-                keys:
-                  0 _col1 (type: string), _col0 (type: string)
-                  1 _col0 (type: string), _col1 (type: string)
-                outputColumnNames: _col1, _col2
-                Statistics: Num rows: 14 Data size: 2744 Basic stats: COMPLETE Column stats: COMPLETE
-                Group By Operator
-                  keys: _col2 (type: string), _col1 (type: string)
-                  mode: hash
-                  outputColumnNames: _col0, _col1
+              Group By Operator
+                keys: KEY._col0 (type: string), KEY._col1 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+                Filter Operator
+                  predicate: _col0 is not null (type: boolean)
                   Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
-                  Reduce Output Operator
-                    key expressions: _col0 (type: string), _col1 (type: string)
-                    sort order: ++
-                    Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                    Statistics: Num rows: 7 Data size: 1372 Basic stats: COMPLETE Column stats: COMPLETE
+                  Select Operator
+                    expressions: _col0 (type: string), _col1 (type: string), true (type: boolean)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: string), _col1 (type: string)
+                      sort order: ++
+                      Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
+                      Statistics: Num rows: 7 Data size: 1400 Basic stats: COMPLETE Column stats: COMPLETE
+                      value expressions: _col2 (type: boolean)
 
   Stage: Stage-0
     Fetch Operator
@@ -2855,12 +2809,11 @@ STAGE PLANS:
 #### A masked pattern was here ####
       Edges:
         Reducer 10 <- Map 9 (CUSTOM_SIMPLE_EDGE)
-        Reducer 11 <- Map 9 (CUSTOM_SIMPLE_EDGE)
         Reducer 2 <- Map 1 (SIMPLE_EDGE)
         Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (ONE_TO_ONE_EDGE)
         Reducer 4 <- Reducer 3 (SIMPLE_EDGE), Reducer 8 (SIMPLE_EDGE)
         Reducer 7 <- Map 6 (XPROD_EDGE), Reducer 10 (XPROD_EDGE)
-        Reducer 8 <- Reducer 11 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
+        Reducer 8 <- Reducer 10 (SIMPLE_EDGE), Reducer 7 (SIMPLE_EDGE)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -2939,10 +2892,6 @@ STAGE PLANS:
                         sort order: 
                         Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col0 (type: struct<count:bigint,sum:double,input:double>)
-                      Reduce Output Operator
-                        sort order: 
-                        Statistics: Num rows: 1 Data size: 80 Basic stats: COMPLETE Column stats: COMPLETE
-                        value expressions: _col0 (type: struct<count:bigint,sum:double,input:double>)
             Execution mode: llap
             LLAP IO: no inputs
         Reducer 10 
@@ -2962,14 +2911,6 @@ STAGE PLANS:
                     sort order: 
                     Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE
                     value expressions: _col0 (type: bigint), _col1 (type: bigint)
-        Reducer 11 
-            Execution mode: llap
-            Reduce Operator Tree:
-              Group By Operator
-                aggregations: avg(VALUE._col0)
-                mode: mergepartial
-                outputColumnNames: _col0
-                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                 Select Operator
                   expressions: _col0 (type: double), true (type: boolean)
                   outputColumnNames: _col0, _col1