You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/09/25 04:59:53 UTC
svn commit: r1627449 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
GraphTran.java IdentityTran.java SparkPlan.java SparkPlanGenerator.java
UnionTran.java
Author: xuefu
Date: Thu Sep 25 02:59:52 2014
New Revision: 1627449
URL: http://svn.apache.org/r1627449
Log:
HIVE-8249: Refactoring SparkPlan and SparkPlanGenerator [Spark Branch] (Chao via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java
Removed:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java?rev=1627449&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java Thu Sep 25 02:59:52 2014
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class IdentityTran implements SparkTran<HiveKey, HiveKey> {
+
+ @Override
+ public JavaPairRDD<HiveKey, BytesWritable> transform(
+ JavaPairRDD<HiveKey, BytesWritable> input) {
+ return input;
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1627449&r1=1627448&r2=1627449&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Thu Sep 25 02:59:52 2014
@@ -18,19 +18,30 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+import java.util.List;
+
public class SparkPlan {
- private GraphTran tran;
+ List<JavaPairRDD<HiveKey, BytesWritable>> rdds;
- public void execute() throws Exception {
- tran.execute();
+ public SparkPlan(List<JavaPairRDD<HiveKey, BytesWritable>> rs) {
+ rdds = rs;
}
- public void setTran(GraphTran tran) {
- this.tran = tran;
+ public void execute() throws Exception {
+ JavaPairRDD<HiveKey, BytesWritable> finalRDD = null;
+ for (JavaPairRDD<HiveKey, BytesWritable> rdd : rdds) {
+ if (finalRDD == null) {
+ finalRDD = rdd;
+ } else {
+ finalRDD = finalRDD.union(rdd);
+ }
+ finalRDD.foreach(HiveVoidFunction.getInstance());
+ }
}
- public GraphTran getTran() {
- return tran;
- }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1627449&r1=1627448&r2=1627449&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Thu Sep 25 02:59:52 2014
@@ -18,27 +18,25 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -67,8 +65,7 @@ public class SparkPlanGenerator {
private final JobConf jobConf;
private Context context;
private Path scratchDir;
- //used to make sure parents join to the same children, in case of Union and Reduce-side join.
- private Map<BaseWork, SparkTran> childWorkTrans = new HashMap<BaseWork, SparkTran>();
+ private Map<BaseWork, JavaPairRDD<HiveKey, BytesWritable>> workToRDDMap;
public SparkPlanGenerator(JavaSparkContext sc, Context context,
JobConf jobConf, Path scratchDir) {
@@ -76,94 +73,48 @@ public class SparkPlanGenerator {
this.context = context;
this.jobConf = jobConf;
this.scratchDir = scratchDir;
+ this.workToRDDMap = new HashMap<BaseWork, JavaPairRDD<HiveKey, BytesWritable>>();
}
public SparkPlan generate(SparkWork sparkWork) throws Exception {
- SparkPlan plan = new SparkPlan();
- GraphTran trans = new GraphTran();
- Set<BaseWork> roots = sparkWork.getRoots();
- for (BaseWork w : roots) {
- if (!(w instanceof MapWork)) {
- throw new Exception(
- "The roots in the SparkWork must be MapWork instances!");
- }
- MapWork mapWork = (MapWork) w;
- JobConf newJobConf = cloneJobConf(mapWork);
- JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(newJobConf, mapWork);
- SparkTran tran = generate(newJobConf, mapWork);
- trans.addRootTranWithInput(tran, input);
-
- while (sparkWork.getChildren(w).size() > 0) {
- BaseWork child = sparkWork.getChildren(w).get(0);
- SparkTran childTran = childWorkTrans.get(child);
- if (child instanceof ReduceWork) {
- ReduceTran rt = null;
- if (((ReduceWork) child).getReducer() instanceof JoinOperator) {
- // Reduce-side join operator: The strategy to insert a UnionTran (UT) to union the output
- // of the two separate input map-trans (MT), which are then shuffled to the appropriate partition
- // for the ReduceTran (RT).
-
- // Before: MW MW
- // \ /
- // RW (JoinOperator)
-
- // After: MT MT
- // \ /
- // UT
- // |
- // RT (JoinOperator)
- if (childTran == null) {
- //create a new UT, and put it in the map.
- rt = generateRTWithEdge(sparkWork, w, child);
- UnionTran ut = generateUnionTran();
- trans.connect(tran, ut);
- trans.connect(ut, rt);
- childWorkTrans.put(child, ut);
- } else {
- //already a UT in the map, connect
- trans.connect(tran, childTran);
- break;
- }
- } else {
- rt = generateRTWithEdge(sparkWork, w, child);
- trans.connect(tran, rt);
- }
- w = child;
- tran = rt;
- } else if (child instanceof UnionWork) {
- if (childTran == null) {
- SparkTran ut = generateUnionTran();
- childWorkTrans.put(child, ut);
- trans.connect(tran, ut);
- w = child;
- tran = ut;
- } else {
- trans.connect(tran, childTran);
- break;
- }
+ List<JavaPairRDD<HiveKey, BytesWritable>> resultRDDs = new ArrayList<JavaPairRDD<HiveKey, BytesWritable>>();
+ for (BaseWork bw : sparkWork.getLeaves()) {
+ resultRDDs.add(generate(sparkWork, bw));
+ }
+
+ return new SparkPlan(resultRDDs);
+ }
+
+ private JavaPairRDD<HiveKey, BytesWritable> generate(SparkWork sparkWork, BaseWork work) throws Exception {
+ // TODO: we should cache this RDD, because it's used by more than one child
+ if (workToRDDMap.containsKey(work)) {
+ return workToRDDMap.get(work);
+ }
+ List<BaseWork> parentWorks = sparkWork.getParents(work);
+ JavaPairRDD<HiveKey, BytesWritable> rdd;
+ SparkEdgeProperty edge = null;
+ if (parentWorks.size() == 0) {
+ Preconditions.checkArgument(work instanceof MapWork,
+ "AssertionError: a work with no parent should be a MapWork.");
+ MapWork mapWork = (MapWork) work;
+ rdd = generateRDD(mapWork);
+ } else {
+ rdd = null;
+ edge = sparkWork.getEdgeProperty(parentWorks.get(0), work);
+ for (BaseWork parentWork : parentWorks) {
+ JavaPairRDD<HiveKey, BytesWritable> parentRDD = generate(sparkWork, parentWork);
+ if (rdd == null) {
+ rdd = parentRDD;
+ } else {
+ rdd = rdd.union(parentRDD);
}
}
+ workToRDDMap.put(work, rdd);
}
- childWorkTrans.clear();
- plan.setTran(trans);
- return plan;
- }
+ SparkTran tran = generate(work, edge);
+ rdd = tran.transform(rdd);
- private ReduceTran generateRTWithEdge(SparkWork sparkWork, BaseWork parent, BaseWork child) throws Exception {
- SparkEdgeProperty edge = sparkWork.getEdgeProperty(parent, child);
- SparkShuffler st = generate(edge);
- ReduceTran rt = generate((ReduceWork) child);
- rt.setShuffler(st);
- rt.setNumPartitions(edge.getNumPartitions());
- return rt;
- }
-
- private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(JobConf jobConf, MapWork mapWork)
- throws Exception {
- Class ifClass = getInputFormat(jobConf, mapWork);
-
- return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
- Writable.class);
+ return rdd;
}
private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
@@ -195,27 +146,31 @@ public class SparkPlanGenerator {
return inputFormatClass;
}
- private MapTran generate(JobConf jobConf, MapWork mw) throws Exception {
- // Create tmp dir for MergeFileWork
- if (mw instanceof MergeFileWork) {
- Path outputPath = ((MergeFileWork) mw).getOutputDir();
- Path tempOutPath = Utilities.toTempPath(outputPath);
- FileSystem fs = outputPath.getFileSystem(jobConf);
- try {
- if (!fs.exists(tempOutPath)) {
- fs.mkdirs(tempOutPath);
- }
- } catch (IOException e) {
- throw new RuntimeException(
- "Can't make path " + outputPath + " : " + e.getMessage());
- }
+ public SparkTran generate(BaseWork work, SparkEdgeProperty edge) throws Exception {
+ if (work instanceof MapWork) {
+ MapWork mw = (MapWork) work;
+ return generate(mw);
+ } else if (work instanceof ReduceWork) {
+ ReduceWork rw = (ReduceWork) work;
+ ReduceTran tran = generate(rw);
+ SparkShuffler shuffler = generate(edge);
+ tran.setShuffler(shuffler);
+ tran.setNumPartitions(edge.getNumPartitions());
+ return tran;
+ } else if (work instanceof UnionWork) {
+ return new IdentityTran();
+ } else {
+ throw new HiveException("Unexpected work: " + work.getName());
}
- initStatsPublisher(mw);
- MapTran result = new MapTran();
- byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
- HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
- result.setMapFunction(mapFunc);
- return result;
+ }
+
+ private JavaPairRDD<HiveKey, BytesWritable> generateRDD(MapWork mapWork)
+ throws Exception {
+ JobConf jobConf = cloneJobConf(mapWork);
+ Class ifClass = getInputFormat(jobConf, mapWork);
+
+ return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
+ Writable.class);
}
private SparkShuffler generate(SparkEdgeProperty edge) {
@@ -227,6 +182,16 @@ public class SparkPlanGenerator {
return new GroupByShuffler();
}
+ private MapTran generate(MapWork mw) throws Exception {
+ initStatsPublisher(mw);
+ MapTran result = new MapTran();
+ JobConf newJobConf = cloneJobConf(mw);
+ byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
+ HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
+ result.setMapFunction(mapFunc);
+ return result;
+ }
+
private ReduceTran generate(ReduceWork rw) throws Exception {
ReduceTran result = new ReduceTran();
JobConf newJobConf = cloneJobConf(rw);
@@ -236,11 +201,6 @@ public class SparkPlanGenerator {
return result;
}
- private UnionTran generateUnionTran() {
- UnionTran result = new UnionTran();
- return result;
- }
-
private JobConf cloneJobConf(BaseWork work) throws Exception {
JobConf cloned = new JobConf(jobConf);
// Make sure we'll use a different plan path from the original one