You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/09/30 16:21:01 UTC
svn commit: r1628450 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
SparkPlan.java SparkPlanGenerator.java
Author: xuefu
Date: Tue Sep 30 14:21:00 2014
New Revision: 1628450
URL: http://svn.apache.org/r1628450
Log:
HIVE-8278: Restoring a graph representation of SparkPlan [Spark Branch] (Chao via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1628450&r1=1628449&r2=1628450&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Tue Sep 30 14:21:00 2014
@@ -22,26 +22,131 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class SparkPlan {
- List<JavaPairRDD<HiveKey, BytesWritable>> rdds;
-
- public SparkPlan(List<JavaPairRDD<HiveKey, BytesWritable>> rs) {
- rdds = rs;
+ private final Set<SparkTran> leafTrans = new HashSet<SparkTran>();
+ private final Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran, List<SparkTran>>();
+ private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
+ private final Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> mapInputs =
+ new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
+
+ public void addInput(SparkTran tran, JavaPairRDD<HiveKey, BytesWritable> input) {
+ if (!mapInputs.containsKey(tran)) {
+ mapInputs.put(tran, input);
+ leafTrans.add(tran);
+ transGraph.put(tran, new LinkedList<SparkTran>());
+ invertedTransGraph.put(tran, new LinkedList<SparkTran>());
+ }
}
- public void execute() throws Exception {
+ public void execute() throws IllegalStateException {
+ Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToRDDMap
+ = new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
+ for (SparkTran tran : getAllTrans()) {
+ JavaPairRDD<HiveKey, BytesWritable> rdd = null;
+ if (mapInputs.containsKey(tran)) {
+ rdd = mapInputs.get(tran);
+ } else {
+ // a non-root tran, it must have a previous input
+ for (SparkTran parentTran : getParents(tran)) {
+ JavaPairRDD<HiveKey, BytesWritable> prevRDD = tranToRDDMap.get(parentTran);
+ if (rdd == null) {
+ rdd = prevRDD;
+ } else {
+ rdd = rdd.union(prevRDD);
+ }
+ }
+ }
+ rdd = tran.transform(rdd);
+ tranToRDDMap.put(tran, rdd);
+ }
+
JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
- for (JavaPairRDD<HiveKey, BytesWritable> rdd : rdds) {
+ for (SparkTran leafTran : leafTrans) {
+ JavaPairRDD<HiveKey, BytesWritable> rdd = tranToRDDMap.get(leafTran);
if (finalRDD == null) {
finalRDD = rdd;
} else {
finalRDD = finalRDD.union(rdd);
}
- finalRDD.foreach(HiveVoidFunction.getInstance());
}
+ finalRDD.foreach(HiveVoidFunction.getInstance());
+ }
+
+ /**
+ * This method returns a topologically sorted list of SparkTran
+ */
+ private List<SparkTran> getAllTrans() {
+ List<SparkTran> result = new LinkedList<SparkTran>();
+ Set<SparkTran> seen = new HashSet<SparkTran>();
+
+ for (SparkTran leaf: leafTrans) {
+ // make sure all leaves are visited at least once
+ visit(leaf, seen, result);
+ }
+
+ return result;
+ }
+
+ private void visit(SparkTran child, Set<SparkTran> seen, List<SparkTran> result) {
+ if (seen.contains(child)) {
+ // don't visit multiple times
+ return;
+ }
+
+ seen.add(child);
+
+ for (SparkTran parent: getParents(child)) {
+ if (!seen.contains(parent)) {
+ visit(parent, seen, result);
+ }
+ }
+
+ result.add(child);
+ }
+
+ /**
+ * Connects the two SparkTrans in the graph. Does not allow multiple connections
+ * between the same pair of SparkTrans.
+ * @param parent
+ * @param child
+ */
+ public void connect(SparkTran parent, SparkTran child) throws IllegalStateException {
+ if (getChildren(parent).contains(child)) {
+ throw new IllegalStateException("Connection already exists");
+ }
+ leafTrans.remove(parent);
+ leafTrans.add(child);
+ if (transGraph.get(child) == null) {
+ transGraph.put(child, new LinkedList<SparkTran>());
+ }
+ if (invertedTransGraph.get(child) == null) {
+ invertedTransGraph.put(child, new LinkedList<SparkTran>());
+ }
+ transGraph.get(parent).add(child);
+ invertedTransGraph.get(child).add(parent);
+ }
+
+ public List<SparkTran> getParents(SparkTran tran) throws IllegalStateException {
+ if (!invertedTransGraph.containsKey(tran)
+ || invertedTransGraph.get(tran) == null) {
+ throw new IllegalStateException("Cannot get parent transformations for " + tran);
+ }
+ return new LinkedList<SparkTran>(invertedTransGraph.get(tran));
+ }
+
+ public List<SparkTran> getChildren(SparkTran tran) throws IllegalStateException {
+ if (!transGraph.containsKey(tran) || transGraph.get(tran) == null) {
+ throw new IllegalStateException("Cannot get children transformations for " + tran);
+ }
+ return new LinkedList<SparkTran>(transGraph.get(tran));
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1628450&r1=1628449&r2=1628450&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Sep 30 14:21:00 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -65,7 +64,6 @@ public class SparkPlanGenerator {
private final JobConf jobConf;
private Context context;
private Path scratchDir;
- private Map<BaseWork, JavaPairRDD<HiveKey, BytesWritable>> workToRDDMap;
public SparkPlanGenerator(JavaSparkContext sc, Context context,
JobConf jobConf, Path scratchDir) {
@@ -73,48 +71,30 @@ public class SparkPlanGenerator {
this.context = context;
this.jobConf = jobConf;
this.scratchDir = scratchDir;
- this.workToRDDMap = new HashMap<BaseWork, JavaPairRDD<HiveKey, BytesWritable>>();
}
public SparkPlan generate(SparkWork sparkWork) throws Exception {
- List<JavaPairRDD<HiveKey, BytesWritable>> resultRDDs = new ArrayList<JavaPairRDD<HiveKey, BytesWritable>>();
- for (BaseWork bw : sparkWork.getLeaves()) {
- resultRDDs.add(generate(sparkWork, bw));
- }
-
- return new SparkPlan(resultRDDs);
- }
-
- private JavaPairRDD<HiveKey, BytesWritable> generate(SparkWork sparkWork, BaseWork work) throws Exception {
- // TODO: we should cache this RDD, because it's used by more than one child
- if (workToRDDMap.containsKey(work)) {
- return workToRDDMap.get(work);
- }
- List<BaseWork> parentWorks = sparkWork.getParents(work);
- JavaPairRDD<HiveKey, BytesWritable> rdd;
- SparkEdgeProperty edge = null;
- if (parentWorks.size() == 0) {
- Preconditions.checkArgument(work instanceof MapWork,
- "AssertionError: a work with no parent should be a MapWork.");
- MapWork mapWork = (MapWork) work;
- rdd = generateRDD(mapWork);
- } else {
- rdd = null;
- edge = sparkWork.getEdgeProperty(parentWorks.get(0), work);
- for (BaseWork parentWork : parentWorks) {
- JavaPairRDD<HiveKey, BytesWritable> parentRDD = generate(sparkWork, parentWork);
- if (rdd == null) {
- rdd = parentRDD;
- } else {
- rdd = rdd.union(parentRDD);
+ SparkPlan result = new SparkPlan();
+ Map<BaseWork, SparkTran> createdTransMap = new HashMap<BaseWork, SparkTran>();
+
+ for (BaseWork work : sparkWork.getAllWork()) {
+ SparkTran tran;
+ if (work instanceof MapWork) {
+ JavaPairRDD<HiveKey, BytesWritable> inputRDD = generateRDD((MapWork)work);
+ tran = generate(work, null);
+ result.addInput(tran, inputRDD);
+ } else {
+ List<BaseWork> parentWorks = sparkWork.getParents(work);
+ tran = generate(work, sparkWork.getEdgeProperty(parentWorks.get(0), work));
+ for (BaseWork parentWork : parentWorks) {
+ SparkTran parentTran = createdTransMap.get(parentWork);
+ result.connect(parentTran, tran);
}
}
- workToRDDMap.put(work, rdd);
+ createdTransMap.put(work, tran);
}
- SparkTran tran = generate(work, edge);
- rdd = tran.transform(rdd);
- return rdd;
+ return result;
}
private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {