You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2018/04/25 07:47:39 UTC
hive git commit: HIVE-17193: HoS: don't combine map works that are
targets of different DPPs (Rui reviewed by Sahil)
Repository: hive
Updated Branches:
refs/heads/master 34ced3062 -> 391ff7e22
HIVE-17193: HoS: don't combine map works that are targets of different DPPs (Rui reviewed by Sahil)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/391ff7e2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/391ff7e2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/391ff7e2
Branch: refs/heads/master
Commit: 391ff7e22e98be6b1ebd7d66206b4403f47150ec
Parents: 34ced30
Author: Rui Li <li...@apache.org>
Authored: Wed Apr 25 15:47:07 2018 +0800
Committer: Rui Li <li...@apache.org>
Committed: Wed Apr 25 15:47:07 2018 +0800
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 1 +
.../exec/spark/SparkDynamicPartitionPruner.java | 3 +-
.../spark/CombineEquivalentWorkResolver.java | 165 ++++++----
.../spark/SparkPartitionPruningSinkDesc.java | 15 +
.../hive/ql/parse/spark/GenSparkUtils.java | 3 +-
.../spark_dynamic_partition_pruning_7.q | 22 ++
.../spark_dynamic_partition_pruning_7.q.out | 329 +++++++++++++++++++
7 files changed, 464 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 2c1a76d..1a34659 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1579,6 +1579,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\
spark_dynamic_partition_pruning_4.q,\
spark_dynamic_partition_pruning_5.q,\
spark_dynamic_partition_pruning_6.q,\
+ spark_dynamic_partition_pruning_7.q,\
spark_dynamic_partition_pruning_mapjoin_only.q,\
spark_constprog_dpp.q,\
spark_dynamic_partition_pruning_recursive_mapjoin.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
index ed889fa..240fa09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
@@ -32,6 +32,7 @@ import java.util.Set;
import com.clearspring.analytics.util.Preconditions;
import javolution.testing.AssertionException;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
@@ -174,7 +175,7 @@ public class SparkDynamicPartitionPruner {
throws HiveException {
Set<Object> values = info.values;
// strip the column name of the targetId
- String columnName = info.columnName.substring(info.columnName.indexOf(':') + 1);
+ String columnName = SparkPartitionPruningSinkDesc.stripOffTargetId(info.columnName);
ObjectInspector oi =
PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory
http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
index 74f0368..c681c74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
@@ -18,25 +18,30 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import java.util.stream.Collectors;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -44,7 +49,6 @@ import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
-import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -61,14 +65,14 @@ import org.apache.hadoop.hive.ql.plan.SparkWork;
*/
public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
protected static transient Logger LOG = LoggerFactory.getLogger(CombineEquivalentWorkResolver.class);
- private List<String> removedMapWorkNames = new ArrayList<String>();
private PhysicalContext pctx;
@Override
public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
this.pctx = pctx;
List<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pctx.getRootTasks());
- TaskGraphWalker taskWalker = new TaskGraphWalker(new EquivalentWorkMatcher());
+ // use a pre-order walker so that DPP sink works are visited (and combined) first
+ GraphWalker taskWalker = new PreOrderWalker(new EquivalentWorkMatcher());
HashMap<Node, Object> nodeOutput = Maps.newHashMap();
taskWalker.startWalking(topNodes, nodeOutput);
return pctx;
@@ -82,25 +86,19 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
}
};
- // maps from a work to the DPPs it contains
+ // maps from a work to the DPPs it contains -- used to combine equivalent DPP sinks
private Map<BaseWork, List<SparkPartitionPruningSinkOperator>> workToDpps = new HashMap<>();
+ // maps from unique id to DPP sink -- used to update the DPP sinks when
+ // target map works are removed
+ private Map<String, SparkPartitionPruningSinkOperator> idToDpps = new HashMap<>();
+
@Override
public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) throws SemanticException {
if (nd instanceof SparkTask) {
SparkTask sparkTask = (SparkTask) nd;
SparkWork sparkWork = sparkTask.getWork();
- // For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will appear in Task2.
- // Task2 is the child task of Task1. Task2 will be traversed before task1 because TaskGraphWalker will first
- // put children task in the front of task queue.
- // If a spark work which is equal to other is found and removed in Task2, the dpp sink can be removed when Task1
- // is traversed(More detailed see HIVE-16948)
- if (removedMapWorkNames.size() > 0) {
- removeDynamicPartitionPruningSink(removedMapWorkNames, sparkWork);
- if (sparkWork.getAllWork().size() == 0) {
- removeEmptySparkTask(sparkTask);
- }
- }
+ collectDPPInfos(sparkWork);
Set<BaseWork> roots = sparkWork.getRoots();
compareWorksRecursively(roots, sparkWork);
@@ -108,6 +106,19 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
return null;
}
+ private void collectDPPInfos(SparkWork sparkWork) {
+ for (BaseWork work : sparkWork.getAllWork()) {
+ Set<Operator<?>> seen = new HashSet<>();
+ for (Operator root : work.getAllRootOperators()) {
+ List<SparkPartitionPruningSinkOperator> sinks = new ArrayList<>();
+ SparkUtilities.collectOp(root, SparkPartitionPruningSinkOperator.class, sinks, seen);
+ for (SparkPartitionPruningSinkOperator sink : sinks) {
+ idToDpps.put(sink.getUniqueId(), sink);
+ }
+ }
+ }
+ }
+
private void compareWorksRecursively(Set<BaseWork> works, SparkWork sparkWork) {
workToDpps.clear();
// find out all equivalent works in the Set.
@@ -173,6 +184,7 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
// equivalent works must have dpp lists of same size
for (int i = 0; i < dppList1.size(); i++) {
combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i));
+ idToDpps.remove(dppList2.get(i).getUniqueId());
}
}
replaceWork(next, first, sparkWork);
@@ -203,7 +215,22 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
sparkWork.remove(previous);
// In order to fix HIVE-16948
if (previous instanceof MapWork) {
- removedMapWorkNames.add(previous.getName());
+ removeTargetFromDPP((MapWork) previous);
+ }
+ }
+
+ // Remove the map work from DPP sinks that have it as a target
+ private void removeTargetFromDPP(MapWork target) {
+ Set<String> dppIds = target.getEventSourceColumnNameMap().keySet();
+ for (String dppId : dppIds) {
+ SparkPartitionPruningSinkOperator sink = idToDpps.get(dppId);
+ Preconditions.checkNotNull(sink, "Unable to find DPP sink whose target work is removed.");
+ SparkPartitionPruningSinkDesc desc = sink.getConf();
+ desc.removeTarget(target.getName());
+ // If the target can be removed, it means there's another MapWork that shares the same
+ // DPP sink, and therefore it cannot be the only target.
+ Preconditions.checkState(!desc.getTargetInfos().isEmpty(),
+ "The removed target work is the only target.");
}
}
@@ -279,6 +306,10 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
}
private boolean compareMapWork(MapWork first, MapWork second) {
+ return hasSamePathToPartition(first, second) && targetsOfSameDPPSink(first, second);
+ }
+
+ private boolean hasSamePathToPartition(MapWork first, MapWork second) {
Map<Path, PartitionDesc> pathToPartition1 = first.getPathToPartitionInfo();
Map<Path, PartitionDesc> pathToPartition2 = second.getPathToPartitionInfo();
if (pathToPartition1.size() == pathToPartition2.size()) {
@@ -295,6 +326,49 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
return false;
}
+ // Checks whether two MapWorks will be pruned by the same DPP sink
+ private boolean targetsOfSameDPPSink(MapWork first, MapWork second) {
+ Set<String> sources1 = first.getEventSourceColumnNameMap().keySet();
+ Set<String> sources2 = second.getEventSourceColumnNameMap().keySet();
+ if (!sources1.equals(sources2)) {
+ return false;
+ }
+ // check whether each DPP sink target same columns
+ for (String source : sources1) {
+ Set<String> names1 = first.getEventSourceColumnNameMap().get(source).stream().map(
+ SparkPartitionPruningSinkDesc::stripOffTargetId).collect(Collectors.toSet());
+ Set<String> names2 = second.getEventSourceColumnNameMap().get(source).stream().map(
+ SparkPartitionPruningSinkDesc::stripOffTargetId).collect(Collectors.toSet());
+ if (!names1.equals(names2)) {
+ return false;
+ }
+
+ Set<String> types1 = new HashSet<>(first.getEventSourceColumnTypeMap().get(source));
+ Set<String> types2 = new HashSet<>(second.getEventSourceColumnTypeMap().get(source));
+ if (!types1.equals(types2)) {
+ return false;
+ }
+
+ Set<TableDesc> tableDescs1 = new HashSet<>(first.getEventSourceTableDescMap().get(source));
+ Set<TableDesc> tableDescs2 = new HashSet<>(second.getEventSourceTableDescMap().get(source));
+ if (!tableDescs1.equals(tableDescs2)) {
+ return false;
+ }
+
+ List<ExprNodeDesc> descs1 = first.getEventSourcePartKeyExprMap().get(source);
+ List<ExprNodeDesc> descs2 = second.getEventSourcePartKeyExprMap().get(source);
+ if (descs1.size() != descs2.size()) {
+ return false;
+ }
+ for (ExprNodeDesc desc : descs1) {
+ if (descs2.stream().noneMatch(d -> ExprNodeDescUtils.isSame(d, desc))) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) {
boolean result = true;
List<BaseWork> firstParents = sparkWork.getParents(first);
@@ -358,59 +432,6 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
private boolean compareCurrentOperator(Operator<?> firstOperator, Operator<?> secondOperator) {
return firstOperator.logicalEquals(secondOperator);
}
-
- /**
- * traverse the children in sparkWork to find the dpp sink operator which target work is included in
- * removedMapWorkList
- * If there is branch, remove prune sink operator branch in the BaseWork
- * If there is no branch, remove the whole BaseWork
- *
- * @param removedMapWorkList: the name of the map work has been deleted because they are equals to other works.
- * @param sparkWork: current spark work
- */
- private void removeDynamicPartitionPruningSink(List<String> removedMapWorkList, SparkWork sparkWork) {
- List<BaseWork> allWorks = sparkWork.getAllWork();
- for (BaseWork baseWork : allWorks) {
- Set<Operator<?>> rootOperators = baseWork.getAllRootOperators();
- for (Operator root : rootOperators) {
- List<Operator<?>> pruningList = new ArrayList<>();
- SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class);
- for (Operator pruneSinkOp : pruningList) {
- SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp;
- for (String removedName : removedMapWorkList) {
- sparkPruneSinkOp.getConf().removeTarget(removedName);
- }
- if (sparkPruneSinkOp.getConf().getTargetInfos().isEmpty()) {
- LOG.debug("ready to remove the sparkPruneSinkOp which target work is " +
- sparkPruneSinkOp.getConf().getTargetWorks() + " because the MapWork is equals to other map work and " +
- "has been deleted!");
- // If there is branch, remove prune sink operator branch in the baseWork
- // If there is no branch, remove the whole baseWork
- if (OperatorUtils.isInBranch(sparkPruneSinkOp)) {
- OperatorUtils.removeBranch(sparkPruneSinkOp);
- } else {
- sparkWork.remove(baseWork);
- }
- }
- }
- }
- }
- }
-
- private void removeEmptySparkTask(SparkTask currTask) {
- // If currTask is rootTasks, remove it and add its children to the rootTasks which currTask is its only parent
- // task
- if (pctx.getRootTasks().contains(currTask)) {
- pctx.removeFromRootTask(currTask);
- List<Task<? extends Serializable>> newRoots = currTask.getChildTasks();
- for (Task newRoot : newRoots) {
- if (newRoot.getParentTasks().size() == 1) {
- pctx.addToRootTask(newRoot);
- }
- }
- }
- SparkUtilities.removeEmptySparkTask(currTask);
- }
}
// Merge the target works of the second DPP sink into the first DPP sink.
http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
index 1607a3f..78d4a41 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimizer.spark;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.optimizer.signature.Signature;
import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
import org.apache.hadoop.hive.ql.plan.Explain;
@@ -141,4 +142,18 @@ public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc {
}
targetInfos.removeAll(toRemove);
}
+
+ // Return a combined column name with corresponding target map work ID.
+ public static String colNameWithTargetId(MapWork target, String colName) {
+ return SparkUtilities.getWorkId(target) + ":" + colName;
+ }
+
+ // Return the column from a combined column name.
+ public static String stripOffTargetId(String name) {
+ int idx = name.indexOf(":");
+ if (idx != -1) {
+ return name.substring(idx + 1);
+ }
+ return name;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 900a800..757cb7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -475,7 +475,8 @@ public class GenSparkUtils {
}
targetInfo.work = targetWork;
- targetInfo.columnName = SparkUtilities.getWorkId(targetWork) + ":" + targetInfo.columnName;
+ targetInfo.columnName = SparkPartitionPruningSinkDesc.colNameWithTargetId(
+ targetWork, targetInfo.columnName);
pruningSink.addAsSourceEvent(targetWork, targetInfo.partKey, targetInfo.columnName,
targetInfo.columnType);
http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q
new file mode 100644
index 0000000..437dea1
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_7.q
@@ -0,0 +1,22 @@
+--! qt:dataset:src
+--! qt:dataset:srcpart
+set hive.spark.dynamic.partition.pruning=true;
+
+-- SORT_QUERY_RESULTS
+
+-- This qfile tests MapWorks won't be combined if they're targets of different DPP sinks
+
+-- MapWorks for srcpart shouldn't be combined because they're pruned by different DPP sinks
+explain
+select * from
+ (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+ (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value);
+
+-- MapWorks for srcpart shouldn't be combined because although they're pruned by the same DPP sink,
+-- the target columns are different.
+explain
+select * from
+ (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+ (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.hr=src.key);
http://git-wip-us.apache.org/repos/asf/hive/blob/391ff7e2/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out
new file mode 100644
index 0000000..1b8c06c
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_7.q.out
@@ -0,0 +1,329 @@
+PREHOOK: query: explain
+select * from
+ (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+ (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select * from
+ (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+ (select srcpart.ds,srcpart.key from srcpart join src on srcpart.ds=src.value)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-2 is a root stage
+ Stage-1 depends on stages: Stage-2
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-2
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 7
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: _col0 (type: string)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Spark Partition Pruning Sink Operator
+ Target Columns: [Map 1 -> [ds:string (ds)]]
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Map 8
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: value is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: value (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: _col0 (type: string)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Spark Partition Pruning Sink Operator
+ Target Columns: [Map 4 -> [ds:string (ds)]]
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 3 (PARTITION-LEVEL SORT, 4)
+ Reducer 5 <- Map 4 (PARTITION-LEVEL SORT, 4), Map 6 (PARTITION-LEVEL SORT, 4)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), ds (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col1 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col1 (type: string)
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string)
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: srcpart
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), ds (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col1 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col1 (type: string)
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string)
+ Map 6
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: value is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: value (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col1 (type: string), _col0 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col1 (type: string), _col0 (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: explain
+select * from
+ (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+ (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.hr=src.key)
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select * from
+ (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.ds=src.key) a
+union all
+ (select srcpart.ds,srcpart.hr,srcpart.key from srcpart join src on srcpart.hr=src.key)
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-2 is a root stage
+ Stage-1 depends on stages: Stage-2
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-2
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 7
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: _col0 (type: string)
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Spark Partition Pruning Sink Operator
+ Target Columns: [Map 1 -> [ds:string (ds)], Map 4 -> [hr:string (hr)]]
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 4), Map 3 (PARTITION-LEVEL SORT, 4)
+ Reducer 5 <- Map 3 (PARTITION-LEVEL SORT, 4), Map 4 (PARTITION-LEVEL SORT, 4)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: srcpart
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col1 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col1 (type: string)
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col2 (type: string)
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string)
+ outputColumnNames: _col0
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: srcpart
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), ds (type: string), hr (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col2 (type: string)
+ sort order: +
+ Map-reduce partition columns: _col2 (type: string)
+ Statistics: Num rows: 2000 Data size: 21248 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: string), _col1 (type: string)
+ Reducer 2
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col1 (type: string), _col2 (type: string), _col0 (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Reducer 5
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col1 (type: string), _col2 (type: string), _col0 (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 2200 Data size: 23372 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 4400 Data size: 46744 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+