You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/17 04:19:58 UTC
svn commit: r1646144 - in /hive/branches/spark/ql/src:
java/org/apache/hadoop/hive/ql/optimizer/
java/org/apache/hadoop/hive/ql/optimizer/physical/
java/org/apache/hadoop/hive/ql/optimizer/spark/
java/org/apache/hadoop/hive/ql/plan/ test/results/client...
Author: xuefu
Date: Wed Dec 17 03:19:57 2014
New Revision: 1646144
URL: http://svn.apache.org/r1646144
Log:
HIVE-9097: Support runtime skew join for more queries [Spark Branch] (Rui via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Dec 17 03:19:57 2014
@@ -983,7 +983,7 @@ public final class GenMapRedUtils {
* @param parseCtx
* @return The TableScanOperator inserted before child.
*/
- protected static TableScanOperator createTemporaryFile(
+ public static TableScanOperator createTemporaryFile(
Operator<? extends OperatorDesc> parent, Operator<? extends OperatorDesc> child,
Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java Wed Dec 17 03:19:57 2014
@@ -82,22 +82,13 @@ public class GenSparkSkewJoinProcessor {
public static void processSkewJoin(JoinOperator joinOp, Task<? extends Serializable> currTask,
ReduceWork reduceWork, ParseContext parseCtx) throws SemanticException {
- // We are trying to adding map joins to handle skew keys, and map join right
- // now does not work with outer joins
- if (!GenMRSkewJoinProcessor.skewJoinEnabled(parseCtx.getConf(), joinOp) ||
- !(currTask instanceof SparkTask)) {
- return;
- }
SparkWork currentWork = ((SparkTask) currTask).getWork();
- if (!supportRuntimeSkewJoin(currentWork, reduceWork)) {
+ if (currentWork.getChildren(reduceWork).size() > 0) {
+ LOG.warn("Skip runtime skew join as the ReduceWork has child work and hasn't been split.");
return;
}
List<Task<? extends Serializable>> children = currTask.getChildTasks();
- if (children != null && children.size() > 1) {
- LOG.warn("Skip runtime skew join as current task has multiple children.");
- return;
- }
Task<? extends Serializable> child =
children != null && children.size() == 1 ? children.get(0) : null;
@@ -424,17 +415,11 @@ public class GenSparkSkewJoinProcessor {
mapJoinDesc.setHashTableMemoryUsage(hashtableMemoryUsage);
}
- /**
- * Currently, we only support the simplest cases where join is the last work
- * of a spark work, i.e. the current ReduceWork is a leave work
- * If the reduce work has follow-up work, e.g. an aggregation following the join,
- * it's difficult to union the results of the original join and conditional map join
- * and feed that to the follow up works. This is not an issue for MR, where ReduceWork
- * is always a terminal work.
- *
- * TODO: can we relax this
- */
- private static boolean supportRuntimeSkewJoin(SparkWork sparkWork, BaseWork work) {
- return sparkWork.getChildren(work).isEmpty();
+ // check this before calling processSkewJoin
+ public static boolean supportRuntimeSkewJoin(JoinOperator joinOp,
+ Task<? extends Serializable> currTask, HiveConf hiveConf) {
+ List<Task<? extends Serializable>> children = currTask.getChildTasks();
+ return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp) &&
+ (children == null || children.size() <= 1);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Wed Dec 17 03:19:57 2014
@@ -78,7 +78,7 @@ public class SparkMapJoinResolver implem
return matchingOps != null && !matchingOps.isEmpty();
}
- private Set<Operator<? extends OperatorDesc>> getOp(BaseWork work, Class<?> clazz) {
+ public static Set<Operator<? extends OperatorDesc>> getOp(BaseWork work, Class<?> clazz) {
Set<Operator<? extends OperatorDesc>> ops = new HashSet<Operator<? extends OperatorDesc>>();
if (work instanceof MapWork) {
Collection<Operator<?>> opSet = ((MapWork) work).getAliasToWork().values();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java Wed Dec 17 03:19:57 2014
@@ -18,18 +18,41 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.optimizer.physical.GenSparkSkewJoinProcessor;
import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinProcFactory;
+import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver;
import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
import java.util.Stack;
/**
@@ -55,17 +78,136 @@ public class SparkSkewJoinProcFactory {
Object... nodeOutputs) throws SemanticException {
SparkSkewJoinResolver.SparkSkewJoinProcCtx context =
(SparkSkewJoinResolver.SparkSkewJoinProcCtx) procCtx;
+ Task<? extends Serializable> currentTsk = context.getCurrentTask();
JoinOperator op = (JoinOperator) nd;
- if (op.getConf().isFixedAsSorted()) {
- return null;
- }
+ ReduceWork reduceWork = context.getReducerToReduceWork().get(op);
ParseContext parseContext = context.getParseCtx();
- Task<? extends Serializable> currentTsk = context.getCurrentTask();
- if (currentTsk instanceof SparkTask) {
- GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk,
- context.getReducerToReduceWork().get(op), parseContext);
+ if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask &&
+ reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork) &&
+ GenSparkSkewJoinProcessor.supportRuntimeSkewJoin(
+ op, currentTsk, parseContext.getConf())) {
+ // first we try to split the task
+ splitTask((SparkTask) currentTsk, reduceWork, parseContext);
+ GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, reduceWork, parseContext);
}
return null;
}
}
+
+ /**
+ * If the join is not in a leaf ReduceWork, the spark task has to be split into 2 tasks.
+ */
+ private static void splitTask(SparkTask currentTask, ReduceWork reduceWork,
+ ParseContext parseContext) throws SemanticException {
+ SparkWork currentWork = currentTask.getWork();
+ Set<Operator<? extends OperatorDesc>> reduceSinkSet =
+ SparkMapJoinResolver.getOp(reduceWork, ReduceSinkOperator.class);
+ if (currentWork.getChildren(reduceWork).size() == 1 && canSplit(currentWork) &&
+ reduceSinkSet.size() == 1) {
+ ReduceSinkOperator reduceSink = (ReduceSinkOperator) reduceSinkSet.iterator().next();
+ BaseWork childWork = currentWork.getChildren(reduceWork).get(0);
+ SparkEdgeProperty originEdge = currentWork.getEdgeProperty(reduceWork, childWork);
+ // disconnect the reduce work from its child. this should produce two isolated sub graphs
+ currentWork.disconnect(reduceWork, childWork);
+ // move works following the current reduce work into a new spark work
+ SparkWork newWork =
+ new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID));
+ newWork.add(childWork);
+ copyWorkGraph(currentWork, newWork, childWork, true);
+ copyWorkGraph(currentWork, newWork, childWork, false);
+ // remove them from current spark work
+ for (BaseWork baseWork : newWork.getAllWorkUnsorted()) {
+ currentWork.remove(baseWork);
+ // TODO: take care of cloneToWork?
+ currentWork.getCloneToWork().remove(baseWork);
+ }
+ // create TS to read intermediate data
+ Context baseCtx = parseContext.getContext();
+ Path taskTmpDir = baseCtx.getMRTmpPath();
+ Operator<? extends OperatorDesc> rsParent = reduceSink.getParentOperators().get(0);
+ TableDesc tableDesc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+ .getFieldSchemasFromRowSchema(rsParent.getSchema(), "temporarycol"));
+ // this will insert FS and TS between the RS and its parent
+ TableScanOperator tableScanOp = GenMapRedUtils.createTemporaryFile(
+ rsParent, reduceSink, taskTmpDir, tableDesc, parseContext);
+ // create new MapWork
+ MapWork mapWork = PlanUtils.getMapRedWork().getMapWork();
+ mapWork.setName("Map " + GenSparkUtils.getUtils().getNextSeqNumber());
+ newWork.add(mapWork);
+ newWork.connect(mapWork, childWork, originEdge);
+ // setup the new map work
+ String streamDesc = taskTmpDir.toUri().toString();
+ if (GenMapRedUtils.needsTagging((ReduceWork) childWork)) {
+ Operator<? extends OperatorDesc> childReducer = ((ReduceWork) childWork).getReducer();
+ QBJoinTree joinTree = null;
+ if (childReducer instanceof JoinOperator) {
+ joinTree = parseContext.getJoinContext().get(childReducer);
+ } else if (childReducer instanceof MapJoinOperator) {
+ joinTree = parseContext.getMapJoinContext().get(childReducer);
+ } else if (childReducer instanceof SMBMapJoinOperator) {
+ joinTree = parseContext.getSmbMapJoinContext().get(childReducer);
+ }
+ if (joinTree != null && joinTree.getId() != null) {
+ streamDesc = joinTree.getId() + ":$INTNAME";
+ } else {
+ streamDesc = "$INTNAME";
+ }
+ // TODO: remove this?
+ String origStreamDesc = streamDesc;
+ int pos = 0;
+ while (mapWork.getAliasToWork().get(streamDesc) != null) {
+ streamDesc = origStreamDesc.concat(String.valueOf(++pos));
+ }
+ }
+ GenMapRedUtils.setTaskPlan(taskTmpDir.toUri().toString(), streamDesc,
+ tableScanOp, mapWork, false, tableDesc);
+ // insert the new task between current task and its child
+ Task<? extends Serializable> newTask = TaskFactory.get(newWork, parseContext.getConf());
+ List<Task<? extends Serializable>> childTasks = currentTask.getChildTasks();
+ // must have at most one child
+ if (childTasks != null && childTasks.size() > 0) {
+ Task<? extends Serializable> childTask = childTasks.get(0);
+ currentTask.removeDependentTask(childTask);
+ newTask.addDependentTask(childTask);
+ }
+ currentTask.addDependentTask(newTask);
+ newTask.setFetchSource(currentTask.isFetchSource());
+ }
+ }
+
+ /**
+ * Whether we can split at reduceWork. For simplicity, let's require each work can
+ * have at most one child work. This may be relaxed by checking connectivity of the
+ * work graph after disconnect the current reduce work from its child
+ */
+ private static boolean canSplit(SparkWork sparkWork) {
+ for (BaseWork baseWork : sparkWork.getAllWorkUnsorted()) {
+ if (sparkWork.getChildren(baseWork).size() > 1) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Copy a sub-graph from originWork to newWork
+ */
+ private static void copyWorkGraph(SparkWork originWork, SparkWork newWork,
+ BaseWork baseWork, boolean upWards) {
+ if (upWards) {
+ for (BaseWork parent : originWork.getParents(baseWork)) {
+ newWork.add(parent);
+ SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork);
+ newWork.connect(parent, baseWork, edgeProperty);
+ copyWorkGraph(originWork, newWork, parent, true);
+ }
+ } else {
+ for (BaseWork child : originWork.getChildren(baseWork)) {
+ newWork.add(child);
+ SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(baseWork, child);
+ newWork.connect(baseWork, child, edgeProperty);
+ copyWorkGraph(originWork, newWork, child, false);
+ }
+ }
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java Wed Dec 17 03:19:57 2014
@@ -41,8 +41,10 @@ import org.apache.hadoop.hive.ql.plan.Sp
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Stack;
@@ -75,9 +77,6 @@ public class SparkSkewJoinResolver imple
Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
if (task instanceof SparkTask) {
SparkWork sparkWork = ((SparkTask) task).getWork();
- if (sparkWork.getAllReduceWork().isEmpty()) {
- return null;
- }
SparkSkewJoinProcCtx skewJoinProcCtx =
new SparkSkewJoinProcCtx(task, physicalContext.getParseContext());
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
@@ -87,7 +86,10 @@ public class SparkSkewJoinResolver imple
SparkSkewJoinProcFactory.getDefaultProc(), opRules, skewJoinProcCtx);
GraphWalker ogw = new DefaultGraphWalker(disp);
ArrayList<Node> topNodes = new ArrayList<Node>();
- for (ReduceWork reduceWork : sparkWork.getAllReduceWork()) {
+ // since we may need to split the task, let's walk the graph bottom-up
+ List<ReduceWork> reduceWorkList = sparkWork.getAllReduceWork();
+ Collections.reverse(reduceWorkList);
+ for (ReduceWork reduceWork : reduceWorkList) {
topNodes.add(reduceWork.getReducer());
skewJoinProcCtx.getReducerToReduceWork().put(reduceWork.getReducer(), reduceWork);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Wed Dec 17 03:19:57 2014
@@ -175,6 +175,7 @@ public class SparkWork extends AbstractO
if (getChildren(a).isEmpty()) {
leaves.add(a);
}
+ edgeProperties.remove(new ImmutablePair<BaseWork, BaseWork>(a, b));
}
/**
@@ -397,7 +398,7 @@ public class SparkWork extends AbstractO
return result;
}
- // get all reduce works in this spark work
+ // get all reduce works in this spark work in sorted order
public List<ReduceWork> getAllReduceWork() {
List<ReduceWork> result = new ArrayList<ReduceWork>();
for (BaseWork work : getAllWork()) {
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out Wed Dec 17 03:19:57 2014
@@ -607,14 +607,17 @@ SELECT sum(hash(Y.key)), sum(hash(Y.valu
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-2
+ Stage-5
+ Stage-3 depends on stages: Stage-5
+ Stage-2 depends on stages: Stage-3
+ Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3)
- Reducer 3 <- Reducer 2 (GROUP, 1)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -660,6 +663,7 @@ STAGE PLANS:
condition expressions:
0
1 {KEY.reducesinkkey0} {VALUE._col0}
+ handleSkewJoin: true
outputColumnNames: _col2, _col3
Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
Group By Operator
@@ -667,6 +671,72 @@ STAGE PLANS:
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-4
+ Conditional Operator
+
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 7
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0
+ 1 {1_VALUE_0} {1_VALUE_1}
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 6
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1 {1_VALUE_0} {1_VALUE_1}
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ outputColumnNames: _col2, _col3
+ Group By Operator
+ aggregations: sum(hash(_col2)), sum(hash(_col3))
+ mode: hash
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-2
+ Spark
+ Edges:
+ Reducer 3 <- Map 5 (GROUP, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 5
+ Map Operator Tree:
+ TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
@@ -731,14 +801,17 @@ SELECT sum(hash(Y.key)), sum(hash(Y.valu
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-2
+ Stage-5
+ Stage-3 depends on stages: Stage-5
+ Stage-2 depends on stages: Stage-3
+ Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3)
- Reducer 3 <- Reducer 2 (GROUP, 1)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -784,6 +857,7 @@ STAGE PLANS:
condition expressions:
0
1 {KEY.reducesinkkey0} {VALUE._col0}
+ handleSkewJoin: true
outputColumnNames: _col2, _col3
Statistics: Num rows: 137 Data size: 1460 Basic stats: COMPLETE Column stats: NONE
Group By Operator
@@ -791,6 +865,72 @@ STAGE PLANS:
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-4
+ Conditional Operator
+
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 7
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0
+ 1 {1_VALUE_0} {1_VALUE_1}
+ keys:
+ 0 reducesinkkey0 (type: string), reducesinkkey1 (type: double)
+ 1 reducesinkkey0 (type: string), reducesinkkey1 (type: double)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 6
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1 {1_VALUE_0} {1_VALUE_1}
+ keys:
+ 0 reducesinkkey0 (type: string), reducesinkkey1 (type: double)
+ 1 reducesinkkey0 (type: string), reducesinkkey1 (type: double)
+ outputColumnNames: _col2, _col3
+ Group By Operator
+ aggregations: sum(hash(_col2)), sum(hash(_col3))
+ mode: hash
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-2
+ Spark
+ Edges:
+ Reducer 3 <- Map 5 (GROUP, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 5
+ Map Operator Tree:
+ TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
@@ -863,14 +1003,19 @@ ON src1.c1 = src3.c5 AND src3.c5 < 80
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
+ Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-7, Stage-2
+ Stage-6
+ Stage-3 depends on stages: Stage-6
+ Stage-2 depends on stages: Stage-3, Stage-4
+ Stage-7
+ Stage-4 depends on stages: Stage-7
+ Stage-0 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3), Map 5 (PARTITION-LEVEL SORT, 3)
- Reducer 3 <- Reducer 2 (GROUP, 1)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -935,6 +1080,7 @@ STAGE PLANS:
0 {KEY.reducesinkkey0}
1 {VALUE._col0}
2
+ handleSkewJoin: true
outputColumnNames: _col0, _col3
Statistics: Num rows: 121 Data size: 1284 Basic stats: COMPLETE Column stats: NONE
Group By Operator
@@ -942,6 +1088,91 @@ STAGE PLANS:
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-5
+ Conditional Operator
+
+ Stage: Stage-6
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 8
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0 {0_VALUE_0}
+ 1 {1_VALUE_0}
+ 2
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ 2 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+ Map 9
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0 {0_VALUE_0}
+ 1 {1_VALUE_0}
+ 2
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ 2 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 7
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ Inner Join 0 to 2
+ condition expressions:
+ 0 {0_VALUE_0}
+ 1 {1_VALUE_0}
+ 2
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ 2 reducesinkkey0 (type: string)
+ outputColumnNames: _col0, _col3
+ Group By Operator
+ aggregations: sum(hash(_col0)), sum(hash(_col3))
+ mode: hash
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-2
+ Spark
+ Edges:
+ Reducer 3 <- Map 6 (GROUP, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 6
+ Map Operator Tree:
+ TableScan
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
@@ -965,6 +1196,72 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Stage: Stage-7
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 10
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0 {0_VALUE_0}
+ 1 {1_VALUE_0}
+ 2
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ 2 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+ Map 12
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0 {0_VALUE_0}
+ 1 {1_VALUE_0}
+ 2
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ 2 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-4
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 11
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ Inner Join 0 to 2
+ condition expressions:
+ 0 {0_VALUE_0}
+ 1 {1_VALUE_0}
+ 2
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ 2 reducesinkkey0 (type: string)
+ outputColumnNames: _col0, _col3
+ Group By Operator
+ aggregations: sum(hash(_col0)), sum(hash(_col3))
+ mode: hash
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
Stage: Stage-0
Fetch Operator
limit: -1
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out Wed Dec 17 03:19:57 2014
@@ -6,16 +6,19 @@ create table noskew as select a.* from s
POSTHOOK: type: CREATETABLE_AS_SELECT
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
- Stage-3 depends on stages: Stage-0
- Stage-2 depends on stages: Stage-3
+ Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-3
+ Stage-6
+ Stage-4 depends on stages: Stage-6
+ Stage-3 depends on stages: Stage-4
+ Stage-0 depends on stages: Stage-3
+ Stage-7 depends on stages: Stage-0
+ Stage-2 depends on stages: Stage-7
STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 4 (PARTITION-LEVEL SORT, 1)
- Reducer 3 <- Reducer 2 (SORT, 1)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -53,13 +56,76 @@ STAGE PLANS:
condition expressions:
0 {KEY.reducesinkkey0} {VALUE._col0}
1
+ handleSkewJoin: true
outputColumnNames: _col0, _col1
Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col1 (type: string)
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-5
+ Conditional Operator
+
+ Stage: Stage-6
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 7
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0 {0_VALUE_0} {0_VALUE_1}
+ 1
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-4
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 6
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {0_VALUE_0} {0_VALUE_1}
+ 1
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-3
+ Spark
+ Edges:
+ Reducer 3 <- Map 5 (SORT, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 5
+ Map Operator Tree:
+ TableScan
+ Reduce Output Operator
+ key expressions: _col0 (type: string)
+ sort order: +
+ Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col1 (type: string)
Reducer 3
Reduce Operator Tree:
Select Operator
@@ -84,7 +150,7 @@ STAGE PLANS:
hdfs directory: true
#### A masked pattern was here ####
- Stage: Stage-3
+ Stage: Stage-7
Create Table Operator:
Create Table
columns: key string, value string