You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/08/13 20:25:07 UTC
svn commit: r1617790 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/
parse/spark/
Author: szehon
Date: Wed Aug 13 18:25:06 2014
New Revision: 1617790
URL: http://svn.apache.org/r1617790
Log:
HIVE-7541 : Support union all on Spark (Na Yang via Szehon) [Spark Branch]
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
Removed:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java?rev=1617790&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java Wed Aug 13 18:25:06 2014
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class GraphTran {
+
+ private Set<SparkTran> rootTrans = new HashSet<SparkTran>();
+ private Set<SparkTran> leafTrans = new HashSet<SparkTran>();
+ private Map<SparkTran, List<SparkTran>> transGraph = new HashMap<SparkTran, List<SparkTran>>();
+ private Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
+ private Map<SparkTran, List<JavaPairRDD<BytesWritable, BytesWritable>>> unionInputs = new HashMap<SparkTran, List<JavaPairRDD<BytesWritable, BytesWritable>>>();
+ private Map<SparkTran, JavaPairRDD<BytesWritable, BytesWritable>> mapInputs = new HashMap<SparkTran, JavaPairRDD<BytesWritable, BytesWritable>>();
+
+ public void addTran(SparkTran tran) {
+ addTranWithInput(tran, null);
+ }
+
+ public void addTranWithInput(SparkTran tran,
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ if (!rootTrans.contains(tran)) {
+ rootTrans.add(tran);
+ leafTrans.add(tran);
+ transGraph.put(tran, new LinkedList<SparkTran>());
+ invertedTransGraph.put(tran, new LinkedList<SparkTran>());
+ }
+ if (input != null) {
+ mapInputs.put(tran, input);
+ }
+ }
+
+ public void execute() throws Exception {
+ JavaPairRDD<BytesWritable, BytesWritable> resultRDD = null;
+ for (SparkTran tran : rootTrans) {
+ // make sure all the root trans are MapTran
+ if (!(tran instanceof MapTran)) {
+ throw new Exception("root transformations must be MapTran!");
+ }
+ JavaPairRDD<BytesWritable, BytesWritable> input = mapInputs.get(tran);
+ if (input == null) {
+ throw new Exception("input is missing for transformation!");
+ }
+ JavaPairRDD<BytesWritable, BytesWritable> rdd = tran.transform(input);
+
+ while (getChildren(tran).size() > 0) {
+ SparkTran childTran = getChildren(tran).get(0);
+ if (childTran instanceof UnionTran) {
+ List<JavaPairRDD<BytesWritable, BytesWritable>> unionInputList = unionInputs
+ .get(childTran);
+ if (unionInputList == null) {
+ // process the first union input RDD, cache it in the hash map
+ unionInputList = new LinkedList<JavaPairRDD<BytesWritable, BytesWritable>>();
+ unionInputList.add(rdd);
+ unionInputs.put(childTran, unionInputList);
+ break;
+ } else if (unionInputList.size() < this.getParents(childTran).size() - 1) {
+ // not the last input RDD yet, continue caching it in the hash map
+ unionInputList.add(rdd);
+ break;
+ } else if (unionInputList.size() == this.getParents(childTran).size() - 1) { // process
+ // process the last input RDD
+ for (JavaPairRDD<BytesWritable, BytesWritable> inputRDD : unionInputList) {
+ ((UnionTran) childTran).setOtherInput(inputRDD);
+ rdd = childTran.transform(rdd);
+ }
+ }
+ } else {
+ rdd = childTran.transform(rdd);
+ }
+ tran = childTran;
+ }
+ resultRDD = rdd;
+ }
+ if (resultRDD != null) {
+ resultRDD.foreach(HiveVoidFunction.getInstance());
+ }
+ }
+
+ public void connect(SparkTran a, SparkTran b) {
+ transGraph.get(a).add(b);
+ invertedTransGraph.get(b).add(a);
+ rootTrans.remove(b);
+ leafTrans.remove(a);
+ }
+
+ public List<SparkTran> getParents(SparkTran tran) throws Exception {
+ if (!invertedTransGraph.containsKey(tran)
+ || invertedTransGraph.get(tran) == null) {
+ throw new Exception("Cannot get parent transformations for " + tran);
+ }
+ return new LinkedList<SparkTran>(invertedTransGraph.get(tran));
+ }
+
+ public List<SparkTran> getChildren(SparkTran tran) throws Exception {
+ if (!transGraph.containsKey(tran) || transGraph.get(tran) == null) {
+ throw new Exception("Cannot get children transformations for " + tran);
+ }
+ return new LinkedList<SparkTran>(transGraph.get(tran));
+ }
+
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1617790&r1=1617789&r2=1617790&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Wed Aug 13 18:25:06 2014
@@ -42,7 +42,8 @@ import java.util.*;
public class SparkClient implements Serializable {
private static final long serialVersionUID = 1L;
- protected static transient final Log LOG = LogFactory.getLog(SparkClient.class);
+ protected static transient final Log LOG = LogFactory
+ .getLog(SparkClient.class);
private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
private static final String SPARK_DEFAULT_MASTER = "local";
@@ -73,12 +74,14 @@ public class SparkClient implements Seri
// set default spark configurations.
sparkConf.set("spark.master", SPARK_DEFAULT_MASTER);
sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME);
- sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- sparkConf.set("spark.default.parallelism", "1");
+ sparkConf.set("spark.serializer",
+ "org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set("spark.default.parallelism", "1");
// load properties from spark-defaults.conf.
InputStream inputStream = null;
try {
- inputStream = this.getClass().getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
+ inputStream = this.getClass().getClassLoader()
+ .getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
if (inputStream != null) {
LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE);
Properties properties = new Properties();
@@ -87,13 +90,15 @@ public class SparkClient implements Seri
if (propertyName.startsWith("spark")) {
String value = properties.getProperty(propertyName);
sparkConf.set(propertyName, properties.getProperty(propertyName));
- LOG.info(String.format("load spark configuration from %s (%s -> %s).",
+ LOG.info(String.format(
+ "load spark configuration from %s (%s -> %s).",
SPARK_DEFAULT_CONF_FILE, propertyName, value));
}
}
}
} catch (IOException e) {
- LOG.info("Failed to open spark configuration file:" + SPARK_DEFAULT_CONF_FILE, e);
+ LOG.info("Failed to open spark configuration file:"
+ + SPARK_DEFAULT_CONF_FILE, e);
} finally {
if (inputStream != null) {
try {
@@ -112,7 +117,8 @@ public class SparkClient implements Seri
if (propertyName.startsWith("spark")) {
String value = entry.getValue();
sparkConf.set(propertyName, value);
- LOG.info(String.format("load spark configuration from hive configuration (%s -> %s).",
+ LOG.info(String.format(
+ "load spark configuration from hive configuration (%s -> %s).",
propertyName, value));
}
}
@@ -122,7 +128,7 @@ public class SparkClient implements Seri
public int execute(DriverContext driverContext, SparkWork sparkWork) {
Context ctx = driverContext.getCtx();
- HiveConf hiveConf = (HiveConf)ctx.getConf();
+ HiveConf hiveConf = (HiveConf) ctx.getConf();
refreshLocalResources(sparkWork, hiveConf);
JobConf jobConf = new JobConf(hiveConf);
@@ -138,7 +144,8 @@ public class SparkClient implements Seri
}
// Generate Spark plan
- SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, emptyScratchDir);
+ SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf,
+ emptyScratchDir);
SparkPlan plan;
try {
plan = gen.generate(sparkWork);
@@ -148,8 +155,12 @@ public class SparkClient implements Seri
}
// Execute generated plan.
- // TODO: we should catch any exception and return more meaningful error code.
- plan.execute();
+ try {
+ plan.execute();
+ } catch (Exception e) {
+ LOG.error("Error executing Spark Plan", e);
+ return 1;
+ }
return 0;
}
@@ -167,7 +178,8 @@ public class SparkClient implements Seri
}
// add added jars
- String addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ String addedJars = Utilities.getResourceFiles(conf,
+ SessionState.ResourceType.JAR);
if (StringUtils.isNotEmpty(addedJars) && StringUtils.isNotBlank(addedJars)) {
HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
addJars(addedJars);
@@ -194,16 +206,20 @@ public class SparkClient implements Seri
}
}
- //add added files
- String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
- if (StringUtils.isNotEmpty(addedFiles) && StringUtils.isNotBlank(addedFiles)) {
+ // add added files
+ String addedFiles = Utilities.getResourceFiles(conf,
+ SessionState.ResourceType.FILE);
+ if (StringUtils.isNotEmpty(addedFiles)
+ && StringUtils.isNotBlank(addedFiles)) {
HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
addResources(addedFiles);
}
// add added archives
- String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
- if (StringUtils.isNotEmpty(addedArchives) && StringUtils.isNotBlank(addedArchives)) {
+ String addedArchives = Utilities.getResourceFiles(conf,
+ SessionState.ResourceType.ARCHIVE);
+ if (StringUtils.isNotEmpty(addedArchives)
+ && StringUtils.isNotBlank(addedArchives)) {
HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
addResources(addedArchives);
}
@@ -211,7 +227,8 @@ public class SparkClient implements Seri
private void addResources(String addedFiles) {
for (String addedFile : addedFiles.split(",")) {
- if (StringUtils.isNotEmpty(addedFile) && StringUtils.isNotBlank(addedFile)
+ if (StringUtils.isNotEmpty(addedFile)
+ && StringUtils.isNotBlank(addedFile)
&& !localFiles.contains(addedFile)) {
localFiles.add(addedFile);
sc.addFile(addedFile);
@@ -222,7 +239,7 @@ public class SparkClient implements Seri
private void addJars(String addedJars) {
for (String addedJar : addedJars.split(",")) {
if (StringUtils.isNotEmpty(addedJar) && StringUtils.isNotBlank(addedJar)
- && !localJars.contains(addedJar)) {
+ && !localJars.contains(addedJar)) {
localJars.add(addedJar);
sc.addJar(addedJar);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1617790&r1=1617789&r2=1617790&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Wed Aug 13 18:25:06 2014
@@ -18,31 +18,19 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.spark.api.java.JavaPairRDD;
-
public class SparkPlan {
- private JavaPairRDD<BytesWritable, BytesWritable> input;
- private SparkTran tran;
- public void execute() {
- JavaPairRDD<BytesWritable, BytesWritable> rdd = tran.transform(input);
- rdd.foreach(HiveVoidFunction.getInstance());
- }
+ private GraphTran tran;
- public SparkTran getTran() {
- return tran;
+ public void execute() throws Exception {
+ tran.execute();
}
- public void setTran(SparkTran tran) {
+ public void setTran(GraphTran tran) {
this.tran = tran;
}
- public JavaPairRDD<BytesWritable, BytesWritable> getInput() {
- return input;
- }
-
- public void setInput(JavaPairRDD<BytesWritable, BytesWritable> input) {
- this.input = input;
+ public GraphTran getTran() {
+ return tran;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1617790&r1=1617789&r2=1617790&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Wed Aug 13 18:25:06 2014
@@ -19,8 +19,9 @@
package org.apache.hadoop.hive.ql.exec.spark;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -57,8 +59,10 @@ public class SparkPlanGenerator {
private final JobConf jobConf;
private Context context;
private Path scratchDir;
+ private Map<BaseWork, SparkTran> unionWorkTrans = new HashMap<BaseWork, SparkTran>();
- public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf jobConf, Path scratchDir) {
+ public SparkPlanGenerator(JavaSparkContext sc, Context context,
+ JobConf jobConf, Path scratchDir) {
this.sc = sc;
this.context = context;
this.jobConf = jobConf;
@@ -67,45 +71,72 @@ public class SparkPlanGenerator {
public SparkPlan generate(SparkWork sparkWork) throws Exception {
SparkPlan plan = new SparkPlan();
- List<SparkTran> trans = new ArrayList<SparkTran>();
+ GraphTran trans = new GraphTran();
Set<BaseWork> roots = sparkWork.getRoots();
- assert(roots != null && roots.size() == 1);
- BaseWork w = roots.iterator().next();
- MapWork mapWork = (MapWork) w;
- trans.add(generate(w));
- while (sparkWork.getChildren(w).size() > 0) {
- ReduceWork child = (ReduceWork) sparkWork.getChildren(w).get(0);
- SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child);
- SparkShuffler st = generate(edge);
- ReduceTran rt = generate(child);
- rt.setShuffler(st);
- rt.setNumPartitions(edge.getNumPartitions());
- trans.add(rt);
- w = child;
- }
- ChainedTran chainedTran = new ChainedTran(trans);
- plan.setTran(chainedTran);
- JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork);
- plan.setInput(input);
+ for (BaseWork w : roots) {
+ if (!(w instanceof MapWork)) {
+ throw new Exception(
+ "The roots in the SparkWork must be MapWork instances!");
+ }
+ MapWork mapWork = (MapWork) w;
+ SparkTran tran = generate(w);
+ JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork);
+ trans.addTranWithInput(tran, input);
+
+ while (sparkWork.getChildren(w).size() > 0) {
+ BaseWork child = sparkWork.getChildren(w).get(0);
+ if (child instanceof ReduceWork) {
+ SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child);
+ SparkShuffler st = generate(edge);
+ ReduceTran rt = generate((ReduceWork) child);
+ rt.setShuffler(st);
+ rt.setNumPartitions(edge.getNumPartitions());
+ trans.addTran(rt);
+ trans.connect(tran, rt);
+ w = child;
+ tran = rt;
+ } else if (child instanceof UnionWork) {
+ if (unionWorkTrans.get(child) != null) {
+ trans.connect(tran, unionWorkTrans.get(child));
+ break;
+ } else {
+ SparkTran ut = generate((UnionWork) child);
+ unionWorkTrans.put(child, ut);
+ trans.addTran(ut);
+ trans.connect(tran, ut);
+ w = child;
+ tran = ut;
+ }
+ }
+ }
+ }
+ unionWorkTrans.clear();
+ plan.setTran(trans);
return plan;
}
- private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork) throws Exception {
- List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false);
- Utilities.setInputPaths(jobConf, inputPaths);
- Utilities.setMapWork(jobConf, mapWork, scratchDir, true);
+ private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork)
+ throws Exception {
+ JobConf newJobConf = new JobConf(jobConf);
+ List<Path> inputPaths = Utilities.getInputPaths(newJobConf, mapWork,
+ scratchDir, context, false);
+ Utilities.setInputPaths(newJobConf, inputPaths);
+ Utilities.setMapWork(newJobConf, mapWork, scratchDir, true);
Class ifClass = getInputFormat(mapWork);
// The mapper class is expected by the HiveInputFormat.
- jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
- return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class);
+ newJobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+ return sc.hadoopRDD(newJobConf, ifClass, WritableComparable.class,
+ Writable.class);
}
private Class getInputFormat(MapWork mWork) throws HiveException {
if (mWork.getInputformat() != null) {
- HiveConf.setVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+ HiveConf.setVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT,
+ mWork.getInputformat());
}
- String inpFormat = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT);
+ String inpFormat = HiveConf.getVar(jobConf,
+ HiveConf.ConfVars.HIVEINPUTFORMAT);
if ((inpFormat == null) || (StringUtils.isBlank(inpFormat))) {
inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
}
@@ -118,7 +149,8 @@ public class SparkPlanGenerator {
try {
inputFormatClass = Class.forName(inpFormat);
} catch (ClassNotFoundException e) {
- String message = "Failed to load specified input format class:" + inpFormat;
+ String message = "Failed to load specified input format class:"
+ + inpFormat;
LOG.error(message, e);
throw new HiveException(message, e);
}
@@ -135,33 +167,36 @@ public class SparkPlanGenerator {
statsPublisher = factory.getStatsPublisher();
if (!statsPublisher.init(jobConf)) { // creating stats table if not exists
if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
- throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+ throw new HiveException(
+ ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
}
}
}
}
if (bw instanceof MapWork) {
- return generate((MapWork)bw);
+ return generate((MapWork) bw);
} else if (bw instanceof ReduceWork) {
- return generate((ReduceWork)bw);
+ return generate((ReduceWork) bw);
} else {
- throw new IllegalArgumentException("Only MapWork and ReduceWork are expected");
+ throw new IllegalArgumentException(
+ "Only MapWork and ReduceWork are expected");
}
}
private MapTran generate(MapWork mw) throws IOException {
+ JobConf newJobConf = new JobConf(jobConf);
MapTran result = new MapTran();
- Utilities.setMapWork(jobConf, mw, scratchDir, false);
- Utilities.createTmpDirs(jobConf, mw);
- jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
- byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+ Utilities.setMapWork(newJobConf, mw, scratchDir, true);
+ Utilities.createTmpDirs(newJobConf, mw);
+ newJobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+ byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
result.setMapFunction(mapFunc);
return result;
}
private SparkShuffler generate(SparkEdgeProperty edge) {
- if (edge.isShuffleSort()){
+ if (edge.isShuffleSort()) {
return new SortByShuffler();
}
return new GroupByShuffler();
@@ -181,4 +216,9 @@ public class SparkPlanGenerator {
return result;
}
+ private UnionTran generate(UnionWork uw) {
+ UnionTran result = new UnionTran();
+ return result;
+ }
+
}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java?rev=1617790&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java Wed Aug 13 18:25:06 2014
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class UnionTran implements SparkTran {
+ JavaPairRDD<BytesWritable, BytesWritable> otherInput;
+
+ @Override
+ public JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ return input.union(otherInput);
+ }
+
+ public void setOtherInput(JavaPairRDD<BytesWritable, BytesWritable> otherInput) {
+ this.otherInput = otherInput;
+ }
+
+ public JavaPairRDD<BytesWritable, BytesWritable> getOtherInput() {
+ return this.otherInput;
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1617790&r1=1617789&r2=1617790&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Wed Aug 13 18:25:06 2014
@@ -230,9 +230,6 @@ public class GenSparkUtils {
}
linked = context.linkedFileSinks.get(path);
linked.add(desc);
-
- desc.setDirName(new Path(path, ""+linked.size()));
- desc.setLinkedFileSinkDesc(linked);
}
if (current instanceof UnionOperator) {