You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/13 16:09:09 UTC
svn commit: r1639356 -
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
Author: xuefu
Date: Thu Nov 13 15:09:08 2014
New Revision: 1639356
URL: http://svn.apache.org/r1639356
Log:
HIVE-8842: auto_join2.q produces incorrect tree [Spark Branch] (Chao via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1639356&r1=1639355&r2=1639356&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Thu Nov 13 15:09:08 2014
@@ -111,11 +111,8 @@ public class SparkMapJoinResolver implem
SparkWork parentWork =
new SparkWork(physicalContext.conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
- // Update dependency graph
- if (!dependencyGraph.containsKey(targetWork)) {
- dependencyGraph.put(targetWork, new ArrayList<SparkWork>());
- }
dependencyGraph.get(targetWork).add(parentWork);
+ dependencyGraph.put(parentWork, new ArrayList<SparkWork>());
// this work is now moved to the parentWork, thus we should
// update this information in sparkWorkMap
@@ -132,14 +129,15 @@ public class SparkMapJoinResolver implem
// Create a new SparkTask for the specified SparkWork, recursively compute
// all the parent SparkTasks that this new task is depend on, if they don't already exists.
- private SparkTask createSparkTask(Task<? extends Serializable> originalTask,
+ private SparkTask createSparkTask(SparkTask originalTask,
SparkWork sparkWork,
Map<SparkWork, SparkTask> createdTaskMap) {
if (createdTaskMap.containsKey(sparkWork)) {
return createdTaskMap.get(sparkWork);
}
- SparkTask resultTask = (SparkTask) TaskFactory.get(sparkWork, physicalContext.conf);
- if (dependencyGraph.get(sparkWork) != null) {
+ SparkTask resultTask = originalTask.getWork() == sparkWork ?
+ originalTask : (SparkTask) TaskFactory.get(sparkWork, physicalContext.conf);
+ if (!dependencyGraph.get(sparkWork).isEmpty()) {
for (SparkWork parentWork : dependencyGraph.get(sparkWork)) {
SparkTask parentTask = createSparkTask(originalTask, parentWork, createdTaskMap);
parentTask.addDependentTask(resultTask);
@@ -155,6 +153,8 @@ public class SparkMapJoinResolver implem
physicalContext.removeFromRootTask(originalTask);
}
}
+
+ createdTaskMap.put(sparkWork, resultTask);
return resultTask;
}
@@ -164,6 +164,8 @@ public class SparkMapJoinResolver implem
Task<? extends Serializable> currentTask = (Task<? extends Serializable>) nd;
if (currentTask instanceof SparkTask) {
SparkWork sparkWork = ((SparkTask) currentTask).getWork();
+
+ dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
Set<BaseWork> leaves = sparkWork.getLeaves();
for (BaseWork leaf : leaves) {
moveWork(sparkWork, leaf, sparkWork);
@@ -181,7 +183,7 @@ public class SparkMapJoinResolver implem
// Now create SparkTasks from the SparkWorks, also set up dependency
for (SparkWork work : dependencyGraph.keySet()) {
- createSparkTask(currentTask, work, createdTaskMap);
+ createSparkTask((SparkTask)currentTask, work, createdTaskMap);
}
}