You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/09/25 04:59:53 UTC

svn commit: r1627449 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: GraphTran.java IdentityTran.java SparkPlan.java SparkPlanGenerator.java UnionTran.java

Author: xuefu
Date: Thu Sep 25 02:59:52 2014
New Revision: 1627449

URL: http://svn.apache.org/r1627449
Log:
HIVE-8249: Refactoring SparkPlan and SparkPlanGenerator [Spark Branch] (Chao via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java
Removed:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java?rev=1627449&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java Thu Sep 25 02:59:52 2014
@@ -0,0 +1,32 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class IdentityTran implements SparkTran<HiveKey, HiveKey> {
+
+  @Override
+  public JavaPairRDD<HiveKey, BytesWritable> transform(
+      JavaPairRDD<HiveKey, BytesWritable> input) {
+    return input;
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1627449&r1=1627448&r2=1627449&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Thu Sep 25 02:59:52 2014
@@ -18,19 +18,30 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+import java.util.List;
+
 public class SparkPlan {
 
-  private GraphTran tran;
+  List<JavaPairRDD<HiveKey, BytesWritable>> rdds;
 
-  public void execute() throws Exception {
-    tran.execute();
+  public SparkPlan(List<JavaPairRDD<HiveKey, BytesWritable>> rs) {
+    rdds = rs;
   }
 
-  public void setTran(GraphTran tran) {
-    this.tran = tran;
+  public void execute() throws Exception {
+    JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
+    for (JavaPairRDD<HiveKey, BytesWritable> rdd : rdds) {
+      if (finalRDD == null) {
+        finalRDD = rdd;
+      } else {
+        finalRDD = finalRDD.union(rdd);
+      }
+      finalRDD.foreach(HiveVoidFunction.getInstance());
+    }
   }
 
-  public GraphTran getTran() {
-    return tran;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1627449&r1=1627448&r2=1627449&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Thu Sep 25 02:59:52 2014
@@ -18,27 +18,25 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -67,8 +65,7 @@ public class SparkPlanGenerator {
   private final JobConf jobConf;
   private Context context;
   private Path scratchDir;
-  //used to make sure parents join to the same children, in case of Union and Reduce-side join.
-  private Map<BaseWork, SparkTran> childWorkTrans = new HashMap<BaseWork, SparkTran>();
+  private Map<BaseWork, JavaPairRDD<HiveKey, BytesWritable>> workToRDDMap;
 
   public SparkPlanGenerator(JavaSparkContext sc, Context context,
       JobConf jobConf, Path scratchDir) {
@@ -76,94 +73,48 @@ public class SparkPlanGenerator {
     this.context = context;
     this.jobConf = jobConf;
     this.scratchDir = scratchDir;
+    this.workToRDDMap = new HashMap<BaseWork, JavaPairRDD<HiveKey, BytesWritable>>();
   }
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
-    SparkPlan plan = new SparkPlan();
-    GraphTran trans = new GraphTran();
-    Set<BaseWork> roots = sparkWork.getRoots();
-    for (BaseWork w : roots) {
-      if (!(w instanceof MapWork)) {
-        throw new Exception(
-            "The roots in the SparkWork must be MapWork instances!");
-      }
-      MapWork mapWork = (MapWork) w;
-      JobConf newJobConf = cloneJobConf(mapWork);
-      JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(newJobConf, mapWork);
-      SparkTran tran = generate(newJobConf, mapWork);
-      trans.addRootTranWithInput(tran, input);
-
-      while (sparkWork.getChildren(w).size() > 0) {
-        BaseWork child = sparkWork.getChildren(w).get(0);
-        SparkTran childTran = childWorkTrans.get(child);
-        if (child instanceof ReduceWork) {
-          ReduceTran rt = null;
-          if (((ReduceWork) child).getReducer() instanceof JoinOperator) {
-            // Reduce-side join operator: The strategy to insert a UnionTran (UT) to union the output
-            // of the two separate input map-trans (MT), which are then shuffled to the appropriate partition
-            // for the ReduceTran (RT).
-
-            // Before:    MW   MW
-            //             \   /
-            //              RW (JoinOperator)
-
-            // After:     MT   MT
-            //             \   /
-            //              UT
-            //              |
-            //              RT (JoinOperator)
-            if (childTran == null) {
-              //create a new UT, and put it in the map.
-              rt = generateRTWithEdge(sparkWork, w, child);
-              UnionTran ut = generateUnionTran();
-              trans.connect(tran, ut);
-              trans.connect(ut, rt);
-              childWorkTrans.put(child, ut);
-            } else {
-              //already a UT in the map, connect
-              trans.connect(tran, childTran);
-              break;
-            }
-          } else {
-            rt = generateRTWithEdge(sparkWork, w, child);
-            trans.connect(tran, rt);
-          }
-          w = child;
-          tran = rt;
-        } else if (child instanceof UnionWork) {
-          if (childTran == null) {
-            SparkTran ut = generateUnionTran();
-            childWorkTrans.put(child, ut);
-            trans.connect(tran, ut);
-            w = child;
-            tran = ut;
-          } else {
-            trans.connect(tran, childTran);
-            break;
-          }
+    List<JavaPairRDD<HiveKey, BytesWritable>> resultRDDs = new ArrayList<JavaPairRDD<HiveKey, BytesWritable>>();
+    for (BaseWork bw : sparkWork.getLeaves()) {
+      resultRDDs.add(generate(sparkWork, bw));
+    }
+
+    return new SparkPlan(resultRDDs);
+  }
+
+  private JavaPairRDD<HiveKey, BytesWritable> generate(SparkWork sparkWork, BaseWork work) throws Exception {
+    // TODO: we should cache this RDD, because it's used by more than one child
+    if (workToRDDMap.containsKey(work)) {
+      return workToRDDMap.get(work);
+    }
+    List<BaseWork> parentWorks = sparkWork.getParents(work);
+    JavaPairRDD<HiveKey, BytesWritable> rdd;
+    SparkEdgeProperty edge = null;
+    if (parentWorks.size() == 0) {
+      Preconditions.checkArgument(work instanceof MapWork,
+          "AssertionError: a work with no parent should be a MapWork.");
+      MapWork mapWork = (MapWork) work;
+      rdd = generateRDD(mapWork);
+    } else {
+      rdd = null;
+      edge = sparkWork.getEdgeProperty(parentWorks.get(0), work);
+      for (BaseWork parentWork : parentWorks) {
+        JavaPairRDD<HiveKey, BytesWritable> parentRDD = generate(sparkWork, parentWork);
+        if (rdd == null) {
+           rdd = parentRDD;
+        } else {
+          rdd = rdd.union(parentRDD);
         }
       }
+      workToRDDMap.put(work, rdd);
     }
-    childWorkTrans.clear();
-    plan.setTran(trans);
-    return plan;
-  }
+    SparkTran tran = generate(work, edge);
+    rdd = tran.transform(rdd);
 
-  private ReduceTran generateRTWithEdge(SparkWork sparkWork, BaseWork parent, BaseWork child) throws Exception {
-    SparkEdgeProperty edge = sparkWork.getEdgeProperty(parent, child);
-    SparkShuffler st = generate(edge);
-    ReduceTran rt = generate((ReduceWork) child);
-    rt.setShuffler(st);
-    rt.setNumPartitions(edge.getNumPartitions());
-    return rt;
-  }
-
-  private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(JobConf jobConf, MapWork mapWork)
-      throws Exception {
-    Class ifClass = getInputFormat(jobConf, mapWork);
-
-    return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
-        Writable.class);
+    return rdd;
   }
 
   private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
@@ -195,27 +146,31 @@ public class SparkPlanGenerator {
     return inputFormatClass;
   }
 
-  private MapTran generate(JobConf jobConf, MapWork mw) throws Exception {
-    // Create tmp dir for MergeFileWork
-    if (mw instanceof MergeFileWork) {
-      Path outputPath = ((MergeFileWork) mw).getOutputDir();
-      Path tempOutPath = Utilities.toTempPath(outputPath);
-      FileSystem fs = outputPath.getFileSystem(jobConf);
-      try {
-        if (!fs.exists(tempOutPath)) {
-          fs.mkdirs(tempOutPath);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(
-            "Can't make path " + outputPath + " : " + e.getMessage());
-      }
+  public SparkTran generate(BaseWork work, SparkEdgeProperty edge) throws Exception {
+    if (work instanceof MapWork) {
+      MapWork mw = (MapWork) work;
+      return generate(mw);
+    } else if (work instanceof ReduceWork) {
+      ReduceWork rw = (ReduceWork) work;
+      ReduceTran tran = generate(rw);
+      SparkShuffler shuffler = generate(edge);
+      tran.setShuffler(shuffler);
+      tran.setNumPartitions(edge.getNumPartitions());
+      return tran;
+    } else if (work instanceof UnionWork) {
+      return new IdentityTran();
+    } else {
+      throw new HiveException("Unexpected work: " + work.getName());
     }
-    initStatsPublisher(mw);
-    MapTran result = new MapTran();
-    byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
-    HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
-    result.setMapFunction(mapFunc);
-    return result;
+  }
+
+  private JavaPairRDD<HiveKey, BytesWritable> generateRDD(MapWork mapWork)
+      throws Exception {
+    JobConf jobConf = cloneJobConf(mapWork);
+    Class ifClass = getInputFormat(jobConf, mapWork);
+
+    return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
+        Writable.class);
   }
 
   private SparkShuffler generate(SparkEdgeProperty edge) {
@@ -227,6 +182,16 @@ public class SparkPlanGenerator {
     return new GroupByShuffler();
   }
 
+  private MapTran generate(MapWork mw) throws Exception {
+    initStatsPublisher(mw);
+    MapTran result = new MapTran();
+    JobConf newJobConf = cloneJobConf(mw);
+    byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
+    HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
+    result.setMapFunction(mapFunc);
+    return result;
+  }
+
   private ReduceTran generate(ReduceWork rw) throws Exception {
     ReduceTran result = new ReduceTran();
     JobConf newJobConf = cloneJobConf(rw);
@@ -236,11 +201,6 @@ public class SparkPlanGenerator {
     return result;
   }
 
-  private UnionTran generateUnionTran() {
-    UnionTran result = new UnionTran();
-    return result;
-  }
-
   private JobConf cloneJobConf(BaseWork work) throws Exception {
     JobConf cloned = new JobConf(jobConf);
     // Make sure we'll use a different plan path from the original one