You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/19 23:01:57 UTC
svn commit: r1618986 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
exec/spark/HiveReduceFunction.java exec/spark/SparkPlanGenerator.java
io/IOContext.java
Author: brock
Date: Tue Aug 19 21:01:57 2014
New Revision: 1618986
URL: http://svn.apache.org/r1618986
Log:
HIVE-7773 - Union all query finished with errors [Spark Branch] (Rui Li via Brock)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1618986&r1=1618985&r2=1618986&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Tue Aug 19 21:01:57 2014
@@ -45,7 +45,6 @@ BytesWritable, BytesWritable> {
call(Iterator<Tuple2<BytesWritable,Iterable<BytesWritable>>> it) throws Exception {
if (jobConf == null) {
jobConf = KryoSerializer.deserializeJobConf(this.buffer);
- jobConf.set("mapred.reducer.class", ExecReducer.class.getName());
}
SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1618986&r1=1618985&r2=1618986&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Aug 19 21:01:57 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -80,8 +81,9 @@ public class SparkPlanGenerator {
"The roots in the SparkWork must be MapWork instances!");
}
MapWork mapWork = (MapWork) w;
- SparkTran tran = generate(w);
- JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork);
+ JobConf newJobConf = cloneJobConf(mapWork);
+ SparkTran tran = generate(newJobConf, mapWork);
+ JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(newJobConf, mapWork);
trans.addTranWithInput(tran, input);
while (sparkWork.getChildren(w).size() > 0) {
@@ -116,18 +118,11 @@ public class SparkPlanGenerator {
return plan;
}
- private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork)
+ private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(JobConf jobConf, MapWork mapWork)
throws Exception {
- JobConf newJobConf = new JobConf(jobConf);
- List<Path> inputPaths = Utilities.getInputPaths(newJobConf, mapWork,
- scratchDir, context, false);
- Utilities.setInputPaths(newJobConf, inputPaths);
- Utilities.setMapWork(newJobConf, mapWork, scratchDir, true);
Class ifClass = getInputFormat(mapWork);
- // The mapper class is expected by the HiveInputFormat.
- newJobConf.set("mapred.mapper.class", ExecMapper.class.getName());
- return sc.hadoopRDD(newJobConf, ifClass, WritableComparable.class,
+ return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
Writable.class);
}
@@ -159,43 +154,10 @@ public class SparkPlanGenerator {
return inputFormatClass;
}
- private SparkTran generate(BaseWork bw) throws Exception {
- // initialize stats publisher if necessary
- if (bw.isGatheringStats()) {
- StatsPublisher statsPublisher;
- StatsFactory factory = StatsFactory.newFactory(jobConf);
- if (factory != null) {
- statsPublisher = factory.getStatsPublisher();
- if (!statsPublisher.init(jobConf)) { // creating stats table if not exists
- if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
- throw new HiveException(
- ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
- }
- }
- }
- }
- if (bw instanceof MapWork) {
- return generate((MapWork) bw);
- } else if (bw instanceof ReduceWork) {
- return generate((ReduceWork) bw);
- } else {
- throw new IllegalArgumentException(
- "Only MapWork and ReduceWork are expected");
- }
- }
-
- private MapTran generate(MapWork mw) throws Exception {
- JobConf newJobConf = new JobConf(jobConf);
+ private MapTran generate(JobConf jobConf, MapWork mw) throws Exception {
+ initStatsPublisher(mw);
MapTran result = new MapTran();
-
- List<Path> inputPaths = Utilities.getInputPaths(newJobConf, mw,
- scratchDir, context, false);
- Utilities.setInputPaths(newJobConf, inputPaths);
-
- Utilities.setMapWork(newJobConf, mw, scratchDir, true);
- Utilities.createTmpDirs(newJobConf, mw);
- newJobConf.set("mapred.mapper.class", ExecMapper.class.getName());
- byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
+ byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
result.setMapFunction(mapFunc);
return result;
@@ -210,14 +172,9 @@ public class SparkPlanGenerator {
return new GroupByShuffler();
}
- private ReduceTran generate(ReduceWork rw) throws IOException {
+ private ReduceTran generate(ReduceWork rw) throws Exception {
ReduceTran result = new ReduceTran();
- // Clone jobConf for each ReduceWork so we can have multiple of them
- JobConf newJobConf = new JobConf(jobConf);
- // Make sure we'll use a different plan path from the original one
- HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, "");
- Utilities.setReduceWork(newJobConf, rw, scratchDir, false);
- Utilities.createTmpDirs(newJobConf, rw);
+ JobConf newJobConf = cloneJobConf(rw);
byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
HiveReduceFunction redFunc = new HiveReduceFunction(confBytes);
result.setReduceFunction(redFunc);
@@ -229,4 +186,39 @@ public class SparkPlanGenerator {
return result;
}
+ private JobConf cloneJobConf(BaseWork work) throws Exception {
+ JobConf cloned = new JobConf(jobConf);
+ // Make sure we'll use a different plan path from the original one
+ HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, "");
+ if (work instanceof MapWork) {
+ List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false);
+ Utilities.setInputPaths(cloned, inputPaths);
+ Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false);
+ Utilities.createTmpDirs(cloned, (MapWork) work);
+ cloned.set("mapred.mapper.class", ExecMapper.class.getName());
+ } else if (work instanceof ReduceWork) {
+ Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false);
+ Utilities.createTmpDirs(cloned, (ReduceWork) work);
+ cloned.set("mapred.reducer.class", ExecReducer.class.getName());
+ }
+ return cloned;
+ }
+
+ private void initStatsPublisher(BaseWork work) throws HiveException {
+ // initialize stats publisher if necessary
+ if (work.isGatheringStats()) {
+ StatsPublisher statsPublisher;
+ StatsFactory factory = StatsFactory.newFactory(jobConf);
+ if (factory != null) {
+ statsPublisher = factory.getStatsPublisher();
+ if (!statsPublisher.init(jobConf)) { // creating stats table if not exists
+ if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+ throw new HiveException(
+ ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+ }
+ }
+ }
+ }
+ }
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1618986&r1=1618985&r2=1618986&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Tue Aug 19 21:01:57 2014
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.io;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.session.SessionState;
/**
@@ -38,19 +37,12 @@ public class IOContext {
protected synchronized IOContext initialValue() { return new IOContext(); }
};
- private static IOContext ioContext = new IOContext();
-
public static IOContext get() {
- if (SessionState.get() == null) {
- // this happens on the backend. only one io context needed.
- return ioContext;
- }
return IOContext.threadLocal.get();
}
public static void clear() {
IOContext.threadLocal.remove();
- ioContext = new IOContext();
}
long currentBlockStart;