You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/14 06:15:51 UTC
svn commit: r1639569 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
exec/spark/HashTableLoader.java
optimizer/physical/SparkMapJoinResolver.java plan/MapredLocalWork.java
Author: xuefu
Date: Fri Nov 14 05:15:49 2014
New Revision: 1639569
URL: http://svn.apache.org/r1639569
Log:
HIVE-8776: Generate MapredLocalWork in SparkMapJoinResolver [Spark Brach] (Chao via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1639569&r1=1639568&r2=1639569&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Fri Nov 14 05:15:49 2014
@@ -76,7 +76,7 @@ public class HashTableLoader implements
}
// All HashTables share the same base dir,
// which is passed in as the tmp path
- Path baseDir = localWork.getTmpHDFSPath();
+ Path baseDir = localWork.getTmpPath();
if (baseDir == null) {
return;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1639569&r1=1639568&r2=1639569&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Fri Nov 14 05:15:49 2014
@@ -20,24 +20,31 @@ package org.apache.hadoop.hive.ql.optimi
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SparkWork;
@@ -120,7 +127,7 @@ public class SparkMapJoinResolver implem
// update this information in sparkWorkMap
sparkWorkMap.put(work, parentWork);
for (BaseWork parent : parentWorks) {
- if (containsOp(parent, HashTableSinkOperator.class)) {
+ if (containsOp(parent, SparkHashTableSinkOperator.class)) {
moveWork(sparkWork, parent, parentWork);
} else {
moveWork(sparkWork, parent, targetWork);
@@ -129,6 +136,46 @@ public class SparkMapJoinResolver implem
}
}
+ private void generateLocalWork(SparkTask originalTask) {
+ SparkWork originalWork = originalTask.getWork();
+ Collection<BaseWork> allBaseWorks = originalWork.getAllWorkUnsorted();
+
+ for (BaseWork work : allBaseWorks) {
+ if (containsOp(work, SparkHashTableSinkOperator.class) ||
+ containsOp(work, MapJoinOperator.class)) {
+ work.setMapRedLocalWork(new MapredLocalWork());
+ }
+ }
+
+ Context ctx = physicalContext.getContext();
+
+ for (BaseWork work : allBaseWorks) {
+ if (containsOp(work, MapJoinOperator.class)) {
+ Path tmpPath = Utilities.generateTmpPath(ctx.getMRTmpPath(), originalTask.getId());
+ MapredLocalWork bigTableLocalWork = work.getMapRedLocalWork();
+ List<Operator<? extends OperatorDesc>> dummyOps =
+ new ArrayList<Operator<? extends OperatorDesc>>(work.getDummyOps());
+ bigTableLocalWork.setDummyParentOp(dummyOps);
+
+ for (BaseWork parentWork : originalWork.getParents(work)) {
+ if (containsOp(parentWork,SparkHashTableSinkOperator.class)) {
+ parentWork.getMapRedLocalWork().setTmpHDFSPath(tmpPath);
+ parentWork.getMapRedLocalWork().setDummyParentOp(
+ new ArrayList<Operator<? extends OperatorDesc>>());
+ }
+ }
+
+ bigTableLocalWork.setAliasToWork(
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
+ bigTableLocalWork.setAliasToFetchWork(new LinkedHashMap<String, FetchWork>());
+ bigTableLocalWork.setTmpPath(tmpPath);
+
+ // TODO: set inputFileChangeSensitive and BucketMapjoinContext,
+ // TODO: enable non-staged mapjoin
+ }
+ }
+ }
+
// Create a new SparkTask for the specified SparkWork, recursively compute
// all the parent SparkTasks that this new task is depend on, if they don't already exists.
private SparkTask createSparkTask(SparkTask originalTask,
@@ -167,7 +214,11 @@ public class SparkMapJoinResolver implem
throws SemanticException {
Task<? extends Serializable> currentTask = (Task<? extends Serializable>) nd;
if (currentTask instanceof SparkTask) {
- SparkWork sparkWork = ((SparkTask) currentTask).getWork();
+ SparkTask sparkTask = (SparkTask) currentTask;
+ SparkWork sparkWork = sparkTask.getWork();
+
+ // Generate MapredLocalWorks for MJ and HTS
+ generateLocalWork(sparkTask);
dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
Set<BaseWork> leaves = sparkWork.getLeaves();
@@ -187,7 +238,7 @@ public class SparkMapJoinResolver implem
// Now create SparkTasks from the SparkWorks, also set up dependency
for (SparkWork work : dependencyGraph.keySet()) {
- createSparkTask((SparkTask)currentTask, work, createdTaskMap);
+ createSparkTask(sparkTask, work, createdTaskMap);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java?rev=1639569&r1=1639568&r2=1639569&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java Fri Nov 14 05:15:49 2014
@@ -45,7 +45,7 @@ public class MapredLocalWork implements
private BucketMapJoinContext bucketMapjoinContext;
private Path tmpPath;
private String stageID;
- // Temp HDFS path for Spark HashTable sink and loader
+ // Temp HDFS path for Spark HashTable sink
private Path tmpHDFSPath;
private List<Operator<? extends OperatorDesc>> dummyParentOp;