You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/12 14:24:02 UTC
svn commit: r1644869 - in /hive/branches/spark/ql/src:
java/org/apache/hadoop/hive/ql/optimizer/physical/
java/org/apache/hadoop/hive/ql/parse/spark/
test/results/clientpositive/spark/
Author: xuefu
Date: Fri Dec 12 13:24:01 2014
New Revision: 1644869
URL: http://svn.apache.org/r1644869
Log:
HIVE-8913: Make SparkMapJoinResolver handle runtime skew join [Spark Branch] (Rui via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java Fri Dec 12 13:24:01 2014
@@ -296,9 +296,6 @@ public class GenSparkSkewJoinProcessor {
path = bigKeysDirMap.get(tags[j]);
bigKeysDirToTaskMap.put(path, skewJoinMapJoinTask);
bigMapWork = mapWork;
- // in MR, ReduceWork is a terminal work, but that's not the case for spark, therefore for
- // big dir MapWork, we'll have to clone all dependent works in the original work graph
- cloneWorkGraph(currentWork, sparkWork, reduceWork, mapWork);
} else {
path = smallTblDirs.get(tags[j]);
}
@@ -379,6 +376,8 @@ public class GenSparkSkewJoinProcessor {
dummyOp.setChildOperators(mapJoinChildren);
bigMapWork.addDummyOp(dummyOp);
MapJoinDesc mjDesc = mapJoinOp.getConf();
+ // mapjoin should not be affected by join reordering
+ mjDesc.resetOrder();
SparkHashTableSinkDesc hashTableSinkDesc = new SparkHashTableSinkDesc(mjDesc);
SparkHashTableSinkOperator hashTableSinkOp =
(SparkHashTableSinkOperator) OperatorFactory.get(hashTableSinkDesc);
@@ -398,6 +397,7 @@ public class GenSparkSkewJoinProcessor {
new ArrayList<Operator<? extends OperatorDesc>>();
tableScanParents.add(tableScan);
hashTableSinkOp.setParentOperators(tableScanParents);
+ hashTableSinkOp.setTag(tag);
}
private static void setMemUsage(MapJoinOperator mapJoinOp, Task<? extends Serializable> task,
@@ -412,8 +412,6 @@ public class GenSparkSkewJoinProcessor {
return;
}
MapJoinDesc mapJoinDesc = mapJoinOp.getConf();
- // mapjoin should not affected by join reordering
- mapJoinDesc.resetOrder();
HiveConf conf = context.getParseCtx().getConf();
float hashtableMemoryUsage;
if (context.isFollowedByGroupBy()) {
@@ -426,42 +424,17 @@ public class GenSparkSkewJoinProcessor {
mapJoinDesc.setHashTableMemoryUsage(hashtableMemoryUsage);
}
- private static void cloneWorkGraph(SparkWork originSparkWork, SparkWork newSparkWork,
- BaseWork originWork, BaseWork newWork) {
- for (BaseWork child : originSparkWork.getChildren(originWork)) {
- SparkEdgeProperty edgeProperty = originSparkWork.getEdgeProperty(originWork, child);
- BaseWork cloneChild = Utilities.cloneBaseWork(child);
- cloneChild.setName(cloneChild.getName().replaceAll("^([a-zA-Z]+)(\\s+)(\\d+)",
- "$1$2" + GenSparkUtils.getUtils().getNextSeqNumber()));
- newSparkWork.add(cloneChild);
- newSparkWork.connect(newWork, cloneChild, edgeProperty);
- cloneWorkGraph(originSparkWork, newSparkWork, child, cloneChild);
- }
- }
-
/**
- * ReduceWork is not terminal work in spark, so we disable runtime skew join for
- * some complicated cases for now, leaving them to future tasks.
- * As an example, consider the following spark work graph:
- * M1 M5
- * \ /
- * R2 (join) M6
- * \ /
- * R3 (join)
- * |
- * R4 (group)
- * If we create map join task for R2, we have to clone M6 as well so that the results
- * get joined properly.
+ * Currently, we only support the simplest cases where join is the last work
+ * of a spark work, i.e. the current ReduceWork is a leave work
+ * If the reduce work has follow-up work, e.g. an aggregation following the join,
+ * it's difficult to union the results of the original join and conditional map join
+ * and feed that to the follow up works. This is not an issue for MR, where ReduceWork
+ * is always a terminal work.
*
- * Let's only support the case where downstream work of the current ReduceWork all
- * have single parent.
+ * TODO: can we relax this
*/
private static boolean supportRuntimeSkewJoin(SparkWork sparkWork, BaseWork work) {
- for (BaseWork child : sparkWork.getChildren(work)) {
- if (sparkWork.getParents(child).size() > 1 || !supportRuntimeSkewJoin(sparkWork, child)) {
- return false;
- }
- }
- return true;
+ return sparkWork.getChildren(work).isEmpty();
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Fri Dec 12 13:24:01 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimi
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -31,6 +32,7 @@ import java.util.Stack;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
@@ -43,6 +45,9 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -244,7 +249,8 @@ public class SparkMapJoinResolver implem
// all the parent SparkTasks that this new task is depend on, if they don't already exists.
private SparkTask createSparkTask(SparkTask originalTask,
SparkWork sparkWork,
- Map<SparkWork, SparkTask> createdTaskMap) {
+ Map<SparkWork, SparkTask> createdTaskMap,
+ ConditionalTask conditionalTask) {
if (createdTaskMap.containsKey(sparkWork)) {
return createdTaskMap.get(sparkWork);
}
@@ -252,19 +258,27 @@ public class SparkMapJoinResolver implem
originalTask : (SparkTask) TaskFactory.get(sparkWork, physicalContext.conf);
if (!dependencyGraph.get(sparkWork).isEmpty()) {
for (SparkWork parentWork : dependencyGraph.get(sparkWork)) {
- SparkTask parentTask = createSparkTask(originalTask, parentWork, createdTaskMap);
+ SparkTask parentTask =
+ createSparkTask(originalTask, parentWork, createdTaskMap, conditionalTask);
parentTask.addDependentTask(resultTask);
}
} else {
if (originalTask != resultTask) {
List<Task<? extends Serializable>> parentTasks = originalTask.getParentTasks();
if (parentTasks != null && parentTasks.size() > 0) {
+ // avoid concurrent modification
+ originalTask.setParentTasks(new ArrayList<Task<? extends Serializable>>());
for (Task<? extends Serializable> parentTask : parentTasks) {
parentTask.addDependentTask(resultTask);
+ parentTask.removeDependentTask(originalTask);
}
} else {
- physicalContext.addToRootTask(resultTask);
- physicalContext.removeFromRootTask(originalTask);
+ if (conditionalTask == null) {
+ physicalContext.addToRootTask(resultTask);
+ physicalContext.removeFromRootTask(originalTask);
+ } else {
+ updateConditionalTask(conditionalTask, originalTask, resultTask);
+ }
}
}
}
@@ -277,36 +291,100 @@ public class SparkMapJoinResolver implem
public Object dispatch(Node nd, Stack<Node> stack, Object... nos)
throws SemanticException {
Task<? extends Serializable> currentTask = (Task<? extends Serializable>) nd;
- if (currentTask instanceof SparkTask) {
- SparkTask sparkTask = (SparkTask) currentTask;
- SparkWork sparkWork = sparkTask.getWork();
-
- // Generate MapredLocalWorks for MJ and HTS
- generateLocalWork(sparkTask);
-
- dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
- Set<BaseWork> leaves = sparkWork.getLeaves();
- for (BaseWork leaf : leaves) {
- moveWork(sparkWork, leaf, sparkWork);
- }
-
- // Now remove all BaseWorks in all the childSparkWorks that we created
- // from the original SparkWork
- for (SparkWork newSparkWork : sparkWorkMap.values()) {
- for (BaseWork work : newSparkWork.getAllWorkUnsorted()) {
- sparkWork.remove(work);
+ if(currentTask.isMapRedTask()) {
+ if (currentTask instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> taskList =
+ ((ConditionalTask) currentTask).getListTasks();
+ for (Task<? extends Serializable> tsk : taskList) {
+ if (tsk instanceof SparkTask) {
+ processCurrentTask((SparkTask) tsk, (ConditionalTask) currentTask);
+ }
}
+ } else if (currentTask instanceof SparkTask) {
+ processCurrentTask((SparkTask) currentTask, null);
}
+ }
- Map<SparkWork, SparkTask> createdTaskMap = new LinkedHashMap<SparkWork, SparkTask>();
+ return null;
+ }
- // Now create SparkTasks from the SparkWorks, also set up dependency
- for (SparkWork work : dependencyGraph.keySet()) {
- createSparkTask(sparkTask, work, createdTaskMap);
+ /**
+ * @param sparkTask The current spark task we're processing.
+ * @param conditionalTask If conditional task is not null, it means the current task is
+ * wrapped in its task list.
+ */
+ private void processCurrentTask(SparkTask sparkTask, ConditionalTask conditionalTask) {
+ dependencyGraph.clear();
+ sparkWorkMap.clear();
+ SparkWork sparkWork = sparkTask.getWork();
+
+ // Generate MapredLocalWorks for MJ and HTS
+ generateLocalWork(sparkTask);
+
+ dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
+ Set<BaseWork> leaves = sparkWork.getLeaves();
+ for (BaseWork leaf : leaves) {
+ moveWork(sparkWork, leaf, sparkWork);
+ }
+
+ // Now remove all BaseWorks in all the childSparkWorks that we created
+ // from the original SparkWork
+ for (SparkWork newSparkWork : sparkWorkMap.values()) {
+ for (BaseWork work : newSparkWork.getAllWorkUnsorted()) {
+ sparkWork.remove(work);
}
}
- return null;
+ Map<SparkWork, SparkTask> createdTaskMap = new LinkedHashMap<SparkWork, SparkTask>();
+
+ // Now create SparkTasks from the SparkWorks, also set up dependency
+ for (SparkWork work : dependencyGraph.keySet()) {
+ createSparkTask(sparkTask, work, createdTaskMap, conditionalTask);
+ }
+ }
+
+ /**
+ * Update the task/work list of this conditional task to replace originalTask with newTask.
+ * For runtime skew join, also update dirToTaskMap for the conditional resolver
+ */
+ private void updateConditionalTask(ConditionalTask conditionalTask,
+ SparkTask originalTask, SparkTask newTask) {
+ ConditionalWork conditionalWork = conditionalTask.getWork();
+ SparkWork originWork = originalTask.getWork();
+ SparkWork newWork = newTask.getWork();
+ List<Task<? extends Serializable>> listTask = conditionalTask.getListTasks();
+ List<Serializable> listWork = (List<Serializable>) conditionalWork.getListWorks();
+ int taskIndex = listTask.indexOf(originalTask);
+ int workIndex = listWork.indexOf(originWork);
+ if (taskIndex < 0 || workIndex < 0) {
+ return;
+ }
+ listTask.set(taskIndex, newTask);
+ listWork.set(workIndex, newWork);
+ ConditionalResolver resolver = conditionalTask.getResolver();
+ if (resolver instanceof ConditionalResolverSkewJoin) {
+ // get bigKeysDirToTaskMap
+ ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx context =
+ (ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx) conditionalTask
+ .getResolverCtx();
+ HashMap<Path, Task<? extends Serializable>> bigKeysDirToTaskMap = context
+ .getDirToTaskMap();
+ // to avoid concurrent modify the hashmap
+ HashMap<Path, Task<? extends Serializable>> newbigKeysDirToTaskMap =
+ new HashMap<Path, Task<? extends Serializable>>();
+ // reset the resolver
+ for (Map.Entry<Path, Task<? extends Serializable>> entry :
+ bigKeysDirToTaskMap.entrySet()) {
+ Task<? extends Serializable> task = entry.getValue();
+ Path bigKeyDir = entry.getKey();
+ if (task.equals(originalTask)) {
+ newbigKeysDirToTaskMap.put(bigKeyDir, newTask);
+ } else {
+ newbigKeysDirToTaskMap.put(bigKeyDir, task);
+ }
+ }
+ context.setDirToTaskMap(newbigKeysDirToTaskMap);
+ }
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Fri Dec 12 13:24:01 2014
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.plan.Mo
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
/**
* SparkCompiler translates the operator plan into SparkTasks.
*
@@ -272,6 +273,12 @@ public class SparkCompiler extends TaskC
physicalCtx = new SplitSparkWorkResolver().resolve(physicalCtx);
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
+ (new SparkSkewJoinResolver()).resolve(physicalCtx);
+ } else {
+ LOG.debug("Skipping runtime skew join optimization");
+ }
+
physicalCtx = new SparkMapJoinResolver().resolve(physicalCtx);
if (conf.getBoolVar(HiveConf.ConfVars.HIVENULLSCANOPTIMIZE)) {
@@ -303,13 +310,6 @@ public class SparkCompiler extends TaskC
} else {
LOG.debug("Skipping stage id rearranger");
}
-
- if (conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
- // TODO: enable after HIVE-8913 is done
- //(new SparkSkewJoinResolver()).resolve(physicalCtx);
- } else {
- LOG.debug("Skipping runtime skew join optimization");
- }
return;
}
}
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out Fri Dec 12 13:24:01 2014
@@ -182,8 +182,8 @@ POSTHOOK: query: explain select src1.key
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-1 depends on stages: Stage-2, Stage-3
Stage-3 depends on stages: Stage-2
+ Stage-1 depends on stages: Stage-3
Stage-0 depends on stages: Stage-1
STAGE PLANS:
@@ -209,6 +209,28 @@ STAGE PLANS:
Local Work:
Map Reduce Local Work
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: src2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0 {key}
+ 1
+ keys:
+ 0 key (type: string)
+ 1 key (type: string)
+ Local Work:
+ Map Reduce Local Work
+
Stage: Stage-1
Spark
Edges:
@@ -301,28 +323,6 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Stage: Stage-3
- Spark
-#### A masked pattern was here ####
- Vertices:
- Map 3
- Map Operator Tree:
- TableScan
- alias: src2
- Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- Filter Operator
- predicate: key is not null (type: boolean)
- Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
- Spark HashTable Sink Operator
- condition expressions:
- 0 {key}
- 1
- keys:
- 0 key (type: string)
- 1 key (type: string)
- Local Work:
- Map Reduce Local Work
-
Stage: Stage-0
Fetch Operator
limit: -1
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out Fri Dec 12 13:24:01 2014
@@ -314,8 +314,8 @@ join tab b on a.k1 = b.key
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-1 depends on stages: Stage-2, Stage-3
Stage-3 depends on stages: Stage-2
+ Stage-1 depends on stages: Stage-3
Stage-0 depends on stages: Stage-1
STAGE PLANS:
@@ -341,6 +341,28 @@ STAGE PLANS:
Local Work:
Map Reduce Local Work
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 3
+ Map Operator Tree:
+ TableScan
+ alias: tab
+ Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0
+ 1 {value}
+ keys:
+ 0 key (type: int)
+ 1 key (type: int)
+ Local Work:
+ Map Reduce Local Work
+
Stage: Stage-1
Spark
Edges:
@@ -421,28 +443,6 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- Stage: Stage-3
- Spark
-#### A masked pattern was here ####
- Vertices:
- Map 3
- Map Operator Tree:
- TableScan
- alias: tab
- Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
- Filter Operator
- predicate: key is not null (type: boolean)
- Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
- Spark HashTable Sink Operator
- condition expressions:
- 0
- 1 {value}
- keys:
- 0 key (type: int)
- 1 key (type: int)
- Local Work:
- Map Reduce Local Work
-
Stage: Stage-0
Fetch Operator
limit: -1
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out Fri Dec 12 13:24:01 2014
@@ -80,7 +80,10 @@ INSERT OVERWRITE TABLE dest_j1 SELECT sr
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
- Stage-0 depends on stages: Stage-1
+ Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-0
+ Stage-5
+ Stage-3 depends on stages: Stage-5
+ Stage-0 depends on stages: Stage-3
Stage-2 depends on stages: Stage-0
STAGE PLANS:
@@ -125,6 +128,7 @@ STAGE PLANS:
condition expressions:
0 {KEY.reducesinkkey0}
1 {VALUE._col0}
+ handleSkewJoin: true
outputColumnNames: _col0, _col6
Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
Select Operator
@@ -140,6 +144,56 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest_j1
+ Stage: Stage-4
+ Conditional Operator
+
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 5
+ Map Operator Tree:
+ TableScan
+ Spark HashTable Sink Operator
+ condition expressions:
+ 0 {0_VALUE_0}
+ 1 {1_VALUE_0}
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-3
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 4
+ Map Operator Tree:
+ TableScan
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {0_VALUE_0}
+ 1 {1_VALUE_0}
+ keys:
+ 0 reducesinkkey0 (type: string)
+ 1 reducesinkkey0 (type: string)
+ outputColumnNames: _col0, _col6
+ Select Operator
+ expressions: UDFToInteger(_col0) (type: int), _col6 (type: string)
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest_j1
+ Local Work:
+ Map Reduce Local Work
+
Stage: Stage-0
Move Operator
tables: