You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/09/30 16:21:01 UTC

svn commit: r1628450 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: SparkPlan.java SparkPlanGenerator.java

Author: xuefu
Date: Tue Sep 30 14:21:00 2014
New Revision: 1628450

URL: http://svn.apache.org/r1628450
Log:
HIVE-8278: Restoring a graph representation of SparkPlan [Spark Branch] (Chao via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1628450&r1=1628449&r2=1628450&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Tue Sep 30 14:21:00 2014
@@ -22,26 +22,131 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class SparkPlan {
 
-  List<JavaPairRDD<HiveKey, BytesWritable>> rdds;
-
-  public SparkPlan(List<JavaPairRDD<HiveKey, BytesWritable>> rs) {
-    rdds = rs;
+  private final Set<SparkTran> leafTrans = new HashSet<SparkTran>();
+  private final Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran, List<SparkTran>>();
+  private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
+  private final Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> mapInputs =
+      new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
+
+  public void addInput(SparkTran tran, JavaPairRDD<HiveKey, BytesWritable> input) {
+    if (!mapInputs.containsKey(tran)) {
+      mapInputs.put(tran, input);
+      leafTrans.add(tran);
+      transGraph.put(tran, new LinkedList<SparkTran>());
+      invertedTransGraph.put(tran, new LinkedList<SparkTran>());
+    }
   }
 
-  public void execute() throws Exception {
+  public void execute() throws IllegalStateException {
+    Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToRDDMap
+        = new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
+    for (SparkTran tran : getAllTrans()) {
+      JavaPairRDD<HiveKey, BytesWritable> rdd = null;
+      if (mapInputs.containsKey(tran)) {
+        rdd = mapInputs.get(tran);
+      } else {
+        // a non-root tran, it must have a previous input
+        for (SparkTran parentTran : getParents(tran)) {
+          JavaPairRDD<HiveKey, BytesWritable> prevRDD = tranToRDDMap.get(parentTran);
+          if (rdd == null) {
+            rdd = prevRDD;
+          } else {
+            rdd = rdd.union(prevRDD);
+          }
+        }
+      }
+      rdd = tran.transform(rdd);
+      tranToRDDMap.put(tran, rdd);
+    }
+
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
-    for (JavaPairRDD<HiveKey, BytesWritable> rdd : rdds) {
+    for (SparkTran leafTran : leafTrans) {
+      JavaPairRDD<HiveKey, BytesWritable> rdd = tranToRDDMap.get(leafTran);
       if (finalRDD == null) {
         finalRDD = rdd;
       } else {
         finalRDD = finalRDD.union(rdd);
       }
-      finalRDD.foreach(HiveVoidFunction.getInstance());
     }
+    finalRDD.foreach(HiveVoidFunction.getInstance());
+  }
+
+  /**
+   * This method returns a topologically sorted list of SparkTran
+   */
+  private List<SparkTran> getAllTrans() {
+    List<SparkTran> result = new LinkedList<SparkTran>();
+    Set<SparkTran> seen = new HashSet<SparkTran>();
+
+    for (SparkTran leaf: leafTrans) {
+      // make sure all leaves are visited at least once
+      visit(leaf, seen, result);
+    }
+
+    return result;
+  }
+
+  private void visit(SparkTran child, Set<SparkTran> seen, List<SparkTran> result) {
+    if (seen.contains(child)) {
+      // don't visit multiple times
+      return;
+    }
+
+    seen.add(child);
+
+    for (SparkTran parent: getParents(child)) {
+      if (!seen.contains(parent)) {
+        visit(parent, seen, result);
+      }
+    }
+
+    result.add(child);
+  }
+
+  /**
+   * Connects the two SparkTrans in the graph.  Does not allow multiple connections
+   * between the same pair of SparkTrans.
+   * @param parent
+   * @param child
+   */
+  public void connect(SparkTran parent, SparkTran child) throws IllegalStateException {
+    if (getChildren(parent).contains(child)) {
+      throw new IllegalStateException("Connection already exists");
+    }
+    leafTrans.remove(parent);
+    leafTrans.add(child);
+    if (transGraph.get(child) == null) {
+      transGraph.put(child, new LinkedList<SparkTran>());
+    }
+    if (invertedTransGraph.get(child) == null) {
+      invertedTransGraph.put(child, new LinkedList<SparkTran>());
+    }
+    transGraph.get(parent).add(child);
+    invertedTransGraph.get(child).add(parent);
+  }
+
+  public List<SparkTran> getParents(SparkTran tran) throws IllegalStateException {
+    if (!invertedTransGraph.containsKey(tran)
+        || invertedTransGraph.get(tran) == null) {
+      throw new IllegalStateException("Cannot get parent transformations for " + tran);
+    }
+    return new LinkedList<SparkTran>(invertedTransGraph.get(tran));
+  }
+
+  public List<SparkTran> getChildren(SparkTran tran) throws IllegalStateException {
+    if (!transGraph.containsKey(tran) || transGraph.get(tran) == null) {
+      throw new IllegalStateException("Cannot get children transformations for " + tran);
+    }
+    return new LinkedList<SparkTran>(transGraph.get(tran));
   }
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1628450&r1=1628449&r2=1628450&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Sep 30 14:21:00 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -65,7 +64,6 @@ public class SparkPlanGenerator {
   private final JobConf jobConf;
   private Context context;
   private Path scratchDir;
-  private Map<BaseWork, JavaPairRDD<HiveKey, BytesWritable>> workToRDDMap;
 
   public SparkPlanGenerator(JavaSparkContext sc, Context context,
       JobConf jobConf, Path scratchDir) {
@@ -73,48 +71,30 @@ public class SparkPlanGenerator {
     this.context = context;
     this.jobConf = jobConf;
     this.scratchDir = scratchDir;
-    this.workToRDDMap = new HashMap<BaseWork, JavaPairRDD<HiveKey, BytesWritable>>();
   }
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
-    List<JavaPairRDD<HiveKey, BytesWritable>> resultRDDs = new ArrayList<JavaPairRDD<HiveKey, BytesWritable>>();
-    for (BaseWork bw : sparkWork.getLeaves()) {
-      resultRDDs.add(generate(sparkWork, bw));
-    }
-
-    return new SparkPlan(resultRDDs);
-  }
-
-  private JavaPairRDD<HiveKey, BytesWritable> generate(SparkWork sparkWork, BaseWork work) throws Exception {
-    // TODO: we should cache this RDD, because it's used by more than one child
-    if (workToRDDMap.containsKey(work)) {
-      return workToRDDMap.get(work);
-    }
-    List<BaseWork> parentWorks = sparkWork.getParents(work);
-    JavaPairRDD<HiveKey, BytesWritable> rdd;
-    SparkEdgeProperty edge = null;
-    if (parentWorks.size() == 0) {
-      Preconditions.checkArgument(work instanceof MapWork,
-          "AssertionError: a work with no parent should be a MapWork.");
-      MapWork mapWork = (MapWork) work;
-      rdd = generateRDD(mapWork);
-    } else {
-      rdd = null;
-      edge = sparkWork.getEdgeProperty(parentWorks.get(0), work);
-      for (BaseWork parentWork : parentWorks) {
-        JavaPairRDD<HiveKey, BytesWritable> parentRDD = generate(sparkWork, parentWork);
-        if (rdd == null) {
-           rdd = parentRDD;
-        } else {
-          rdd = rdd.union(parentRDD);
+    SparkPlan result = new SparkPlan();
+    Map<BaseWork, SparkTran> createdTransMap = new HashMap<BaseWork, SparkTran>();
+
+    for (BaseWork work : sparkWork.getAllWork()) {
+      SparkTran tran;
+      if (work instanceof MapWork) {
+        JavaPairRDD<HiveKey, BytesWritable> inputRDD = generateRDD((MapWork)work);
+        tran = generate(work, null);
+        result.addInput(tran, inputRDD);
+      } else {
+        List<BaseWork> parentWorks = sparkWork.getParents(work);
+        tran = generate(work, sparkWork.getEdgeProperty(parentWorks.get(0), work));
+        for (BaseWork parentWork : parentWorks) {
+          SparkTran parentTran = createdTransMap.get(parentWork);
+          result.connect(parentTran, tran);
         }
       }
-      workToRDDMap.put(work, rdd);
+      createdTransMap.put(work, tran);
     }
-    SparkTran tran = generate(work, edge);
-    rdd = tran.transform(rdd);
 
-    return rdd;
+    return result;
   }
 
   private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {