You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2018/04/25 07:47:39 UTC

hive git commit: HIVE-17193: HoS: don't combine map works that are targets of different DPPs (Rui reviewed by Sahil)

Repository: hive
Updated Branches:
  refs/heads/master 34ced3062 -> 391ff7e22


HIVE-17193: HoS: don't combine map works that are targets of different DPPs (Rui reviewed by Sahil)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/391ff7e2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/391ff7e2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/391ff7e2

Branch: refs/heads/master
Commit: 391ff7e22e98be6b1ebd7d66206b4403f47150ec
Parents: 34ced30
Author: Rui Li <li...@apache.org>
Authored: Wed Apr 25 15:47:07 2018 +0800
Committer: Rui Li <li...@apache.org>
Committed: Wed Apr 25 15:47:07 2018 +0800

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   1 +
 .../exec/spark/SparkDynamicPartitionPruner.java |   3 +-
 .../spark/CombineEquivalentWorkResolver.java    | 165 ++++++----
 .../spark/SparkPartitionPruningSinkDesc.java    |  15 +
 .../hive/ql/parse/spark/GenSparkUtils.java      |   3 +-
 .../spark_dynamic_partition_pruning_7.q         |  22 ++
 .../spark_dynamic_partition_pruning_7.q.out     | 329 +++++++++++++++++++
 7 files changed, 464 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 2c1a76d..1a34659 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1579,6 +1579,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\
   spark_dynamic_partition_pruning_4.q,\
   spark_dynamic_partition_pruning_5.q,\
   spark_dynamic_partition_pruning_6.q,\
+  spark_dynamic_partition_pruning_7.q,\
   spark_dynamic_partition_pruning_mapjoin_only.q,\
   spark_constprog_dpp.q,\
   spark_dynamic_partition_pruning_recursive_mapjoin.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
index ed889fa..240fa09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
@@ -32,6 +32,7 @@ import java.util.Set;
 
 import com.clearspring.analytics.util.Preconditions;
 import javolution.testing.AssertionException;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -174,7 +175,7 @@ public class SparkDynamicPartitionPruner {
       throws HiveException {
     Set<Object> values = info.values;
     // strip the column name of the targetId
-    String columnName = info.columnName.substring(info.columnName.indexOf(':') + 1);
+    String columnName = SparkPartitionPruningSinkDesc.stripOffTargetId(info.columnName);
 
     ObjectInspector oi =
         PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory

http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
index 74f0368..c681c74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
@@ -18,25 +18,30 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
+import java.util.stream.Collectors;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -44,7 +49,6 @@ import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -61,14 +65,14 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
  */
 public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
   protected static transient Logger LOG = LoggerFactory.getLogger(CombineEquivalentWorkResolver.class);
-  private List<String> removedMapWorkNames = new ArrayList<String>();
   private PhysicalContext pctx;
   @Override
   public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
     this.pctx = pctx;
     List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.getRootTasks());
-    TaskGraphWalker taskWalker = new TaskGraphWalker(new EquivalentWorkMatcher());
+    // use a pre-order walker so that DPP sink works are visited (and combined) first
+    GraphWalker taskWalker = new PreOrderWalker(new EquivalentWorkMatcher());
     HashMap<Node, Object> nodeOutput = Maps.newHashMap();
     taskWalker.startWalking(topNodes, nodeOutput);
     return pctx;
@@ -82,25 +86,19 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       }
     };
 
-    // maps from a work to the DPPs it contains
+    // maps from a work to the DPPs it contains -- used to combine equivalent DPP sinks
     private Map<BaseWork, List<SparkPartitionPruningSinkOperator>> workToDpps = new HashMap<>();
 
+    // maps from unique id to DPP sink -- used to update the DPP sinks when
+    // target map works are removed
+    private Map<String, SparkPartitionPruningSinkOperator> idToDpps = new HashMap<>();
+
     @Override
     public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) throws SemanticException {
       if (nd instanceof SparkTask) {
         SparkTask sparkTask = (SparkTask) nd;
         SparkWork sparkWork = sparkTask.getWork();
-        // For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will appear in Task2.
-        // Task2 is the child task of Task1. Task2 will be traversed before task1 because TaskGraphWalker will first
-        // put children task in the front of task queue.
-        // If a spark work which is equal to other is found and removed in Task2, the dpp sink can be removed when Task1
-        // is traversed(More detailed see HIVE-16948)
-        if (removedMapWorkNames.size() > 0) {
-          removeDynamicPartitionPruningSink(removedMapWorkNames, sparkWork);
-          if (sparkWork.getAllWork().size() == 0) {
-            removeEmptySparkTask(sparkTask);
-          }
-        }
+        collectDPPInfos(sparkWork);
 
         Set<BaseWork> roots = sparkWork.getRoots();
         compareWorksRecursively(roots, sparkWork);
@@ -108,6 +106,19 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       return null;
     }
 
+    private void collectDPPInfos(SparkWork sparkWork) {
+      for (BaseWork work : sparkWork.getAllWork()) {
+        Set<Operator<?>> seen = new HashSet<>();
+        for (Operator root : work.getAllRootOperators()) {
+          List<SparkPartitionPruningSinkOperator> sinks = new ArrayList<>();
+          SparkUtilities.collectOp(root, SparkPartitionPruningSinkOperator.class, sinks, seen);
+          for (SparkPartitionPruningSinkOperator sink : sinks) {
+            idToDpps.put(sink.getUniqueId(), sink);
+          }
+        }
+      }
+    }
+
     private void compareWorksRecursively(Set<BaseWork> works, SparkWork sparkWork) {
       workToDpps.clear();
       // find out all equivalent works in the Set.
@@ -173,6 +184,7 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
               // equivalent works must have dpp lists of same size
               for (int i = 0; i < dppList1.size(); i++) {
                 combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i));
+                idToDpps.remove(dppList2.get(i).getUniqueId());
               }
             }
             replaceWork(next, first, sparkWork);
@@ -203,7 +215,22 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       sparkWork.remove(previous);
       // In order to fix HIVE-16948
       if (previous instanceof MapWork) {
-        removedMapWorkNames.add(previous.getName());
+        removeTargetFromDPP((MapWork) previous);
+      }
+    }
+
+    // Remove the map work from DPP sinks that have it as a target
+    private void removeTargetFromDPP(MapWork target) {
+      Set<String> dppIds = target.getEventSourceColumnNameMap().keySet();
+      for (String dppId : dppIds) {
+        SparkPartitionPruningSinkOperator sink = idToDpps.get(dppId);
+        Preconditions.checkNotNull(sink, "Unable to find DPP sink whose target work is removed.");
+        SparkPartitionPruningSinkDesc desc = sink.getConf();
+        desc.removeTarget(target.getName());
+        // If the target can be removed, it means there's another MapWork that shares the same
+        // DPP sink, and therefore it cannot be the only target.
+        Preconditions.checkState(!desc.getTargetInfos().isEmpty(),
+            "The removed target work is the only target.");
       }
     }
 
@@ -279,6 +306,10 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
     }
 
     private boolean compareMapWork(MapWork first, MapWork second) {
+      return hasSamePathToPartition(first, second) && targetsOfSameDPPSink(first, second);
+    }
+
+    private boolean hasSamePathToPartition(MapWork first, MapWork second) {
       Map<Path, PartitionDesc> pathToPartition1 = first.getPathToPartitionInfo();
       Map<Path, PartitionDesc> pathToPartition2 = second.getPathToPartitionInfo();
       if (pathToPartition1.size() == pathToPartition2.size()) {
@@ -295,6 +326,49 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       return false;
     }
 
+    // Checks whether two MapWorks will be pruned by the same DPP sink
+    private boolean targetsOfSameDPPSink(MapWork first, MapWork second) {
+      Set<String> sources1 = first.getEventSourceColumnNameMap().keySet();
+      Set<String> sources2 = second.getEventSourceColumnNameMap().keySet();
+      if (!sources1.equals(sources2)) {
+        return false;
+      }
+      // check whether each DPP sink target same columns
+      for (String source : sources1) {
+        Set<String> names1 = first.getEventSourceColumnNameMap().get(source).stream().map(
+            SparkPartitionPruningSinkDesc::stripOffTargetId).collect(Collectors.toSet());
+        Set<String> names2 = second.getEventSourceColumnNameMap().get(source).stream().map(
+            SparkPartitionPruningSinkDesc::stripOffTargetId).collect(Collectors.toSet());
+        if (!names1.equals(names2)) {
+          return false;
+        }
+
+        Set<String> types1 = new HashSet<>(first.getEventSourceColumnTypeMap().get(source));
+        Set<String> types2 = new HashSet<>(second.getEventSourceColumnTypeMap().get(source));
+        if (!types1.equals(types2)) {
+          return false;
+        }
+
+        Set<TableDesc> tableDescs1 = new HashSet<>(first.getEventSourceTableDescMap().get(source));
+        Set<TableDesc> tableDescs2 = new HashSet<>(second.getEventSourceTableDescMap().get(source));
+        if (!tableDescs1.equals(tableDescs2)) {
+          return false;
+        }
+
+        List<ExprNodeDesc> descs1 = first.getEventSourcePartKeyExprMap().get(source);
+        List<ExprNodeDesc> descs2 = second.getEventSourcePartKeyExprMap().get(source);
+        if (descs1.size() != descs2.size()) {
+          return false;
+        }
+        for (ExprNodeDesc desc : descs1) {
+          if (descs2.stream().noneMatch(d -> ExprNodeDescUtils.isSame(d, desc))) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
     private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) {
       boolean result = true;
       List<BaseWork> firstParents = sparkWork.getParents(first);
@@ -358,59 +432,6 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
     private boolean compareCurrentOperator(Operator<?> firstOperator, Operator<?> secondOperator) {
       return firstOperator.logicalEquals(secondOperator);
     }
-
-    /**
-     * traverse the children in sparkWork to find the dpp sink operator which target work is included in
-     * removedMapWorkList
-     * If there is branch, remove prune sink operator branch in the BaseWork
-     * If there is no branch, remove the whole BaseWork
-     *
-     * @param removedMapWorkList: the name of the map work has been deleted because they are equals to other works.
-     * @param sparkWork:          current spark work
-     */
-    private void removeDynamicPartitionPruningSink(List<String> removedMapWorkList, SparkWork sparkWork) {
-      List<BaseWork> allWorks = sparkWork.getAllWork();
-      for (BaseWork baseWork : allWorks) {
-        Set<Operator<?>> rootOperators = baseWork.getAllRootOperators();
-        for (Operator root : rootOperators) {
-          List<Operator<?>> pruningList = new ArrayList<>();
-          SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class);
-          for (Operator pruneSinkOp : pruningList) {
-            SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp;
-            for (String removedName : removedMapWorkList) {
-              sparkPruneSinkOp.getConf().removeTarget(removedName);
-            }
-            if (sparkPruneSinkOp.getConf().getTargetInfos().isEmpty()) {
-              LOG.debug("ready to remove the sparkPruneSinkOp which target work is " +
-                  sparkPruneSinkOp.getConf().getTargetWorks() + " because the MapWork is equals to other map work and " +
-                  "has been deleted!");
-              // If there is branch, remove prune sink operator branch in the baseWork
-              // If there is no branch, remove the whole baseWork
-              if (OperatorUtils.isInBranch(sparkPruneSinkOp)) {
-                OperatorUtils.removeBranch(sparkPruneSinkOp);
-              } else {
-                sparkWork.remove(baseWork);
-              }
-            }
-          }
-        }
-      }
-    }
-
-    private void removeEmptySparkTask(SparkTask currTask) {
-      // If currTask is rootTasks, remove it and add its children to the rootTasks which currTask is its only parent
-      // task
-      if (pctx.getRootTasks().contains(currTask)) {
-        pctx.removeFromRootTask(currTask);
-        List<Task<? extends Serializable>> newRoots = currTask.getChildTasks();
-        for (Task newRoot : newRoots) {
-          if (newRoot.getParentTasks().size() == 1) {
-            pctx.addToRootTask(newRoot);
-          }
-        }
-      }
-      SparkUtilities.removeEmptySparkTask(currTask);
-    }
   }
 
   // Merge the target works of the second DPP sink into the first DPP sink.

http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
index 1607a3f..78d4a41 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
 import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Explain;
@@ -141,4 +142,18 @@ public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc {
     }
     targetInfos.removeAll(toRemove);
   }
+
+  // Return a combined column name with corresponding target map work ID.
+  public static String colNameWithTargetId(MapWork target, String colName) {
+    return SparkUtilities.getWorkId(target) + ":" + colName;
+  }
+
+  // Return the column from a combined column name.
+  public static String stripOffTargetId(String name) {
+    int idx = name.indexOf(":");
+    if (idx != -1) {
+      return name.substring(idx + 1);
+    }
+    return name;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 900a800..757cb7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -475,7 +475,8 @@ public class GenSparkUtils {
       }
 
       targetInfo.work = targetWork;
-      targetInfo.columnName = SparkUtilities.getWorkId(targetWork) + ":" + targetInfo.columnName;
+      targetInfo.columnName = SparkPartitionPruningSinkDesc.colNameWithTargetId(
+          targetWork, targetInfo.columnName);
 
       pruningSink.addAsSourceEvent(targetWork, targetInfo.partKey, targetInfo.columnName,
           targetInfo.columnType);

http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q
new file mode 100644
index 0000000..437dea1
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q
@@ -0,0 +1,22 @@
+--! qt:dataset:src
+--! qt:dataset:srcpart
+set hive.spark.dynamic.partition.pruning=true;
+
+-- SORT_QUERY_RESULTS
+
+-- This qfile tests MapWorks won't be combined if they're targets of different DPP sinks
+
+-- MapWorks for srcpart shouldn't be combined because they're pruned by different DPP sinks
+explain
+select * from
+  (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+  (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value);
+
+-- MapWorks for srcpart shouldn't be combined because although they're pruned by the same DPP sink,
+-- the target columns are different.
+explain
+select * from
+  (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+  (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.hr=src.key);

http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out
new file mode 100644
index 0000000..1b8c06c
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out
@@ -0,0 +1,329 @@
+PREHOOK: query: explain
+select * from
+  (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+  (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select * from
+  (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+  (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 7 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            Target Columns: [Map 1 -> [ds:string (ds)]]
+                            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+        Map 8 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            Target Columns: [Map 4 -> [ds:string (ds)]]
+                            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 3 (PARTITION-LEVEL SORT, 4)
+        Reducer 5 <- Map 4 (PARTITION-LEVEL SORT, 4), Map 6 (PARTITION-LEVEL SORT, 4)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart
+                  Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), ds (type: string)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col1 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col1 (type: string)
+                      Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: string)
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart
+                  Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), ds (type: string)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col1 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col1 (type: string)
+                      Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: string)
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: value is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: value (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col1 (type: string), _col0 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col1 (type: string), _col0 (type: string)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: explain
+select * from
+  (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+  (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.hr=src.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select * from
+  (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+  (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.hr=src.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-2 is a root stage
+  Stage-1 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-2
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 7 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Select Operator
+                        expressions: _col0 (type: string)
+                        outputColumnNames: _col0
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        Group By Operator
+                          keys: _col0 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0
+                          Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                          Spark Partition Pruning Sink Operator
+                            Target Columns: [Map 1 -> [ds:string (ds)], Map 4 -> [hr:string (hr)]]
+                            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 3 (PARTITION-LEVEL SORT, 4)
+        Reducer 5 <- Map 3 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart
+                  Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), ds (type: string), hr (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col1 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col1 (type: string)
+                      Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: string), _col2 (type: string)
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: string)
+                      outputColumnNames: _col0
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  alias: srcpart
+                  Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: key (type: string), ds (type: string), hr (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col2 (type: string)
+                      sort order: +
+                      Map-reduce partition columns: _col2 (type: string)
+                      Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: _col0 (type: string), _col1 (type: string)
+        Reducer 2 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col1 (type: string), _col2 (type: string), _col0 (type: string)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+        Reducer 5 
+            Reduce Operator Tree:
+              Join Operator
+                condition map:
+                     Inner Join 0 to 1
+                outputColumnNames: _col0, _col1, _col2
+                Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: _col1 (type: string), _col2 (type: string), _col0 (type: string)
+                  outputColumnNames: _col0, _col1, _col2
+                  Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+