You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/08/12 08:24:00 UTC
svn commit: r1617420 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
exec/mr/ExecReducer.java exec/spark/HiveReduceFunctionResultList.java
exec/spark/SparkPlanGenerator.java plan/SparkEdgeProperty.java
Author: szehon
Date: Tue Aug 12 06:24:00 2014
New Revision: 1617420
URL: http://svn.apache.org/r1617420
Log:
HIVE-7626 : Reduce operator initialization failed when running mulitple MR query on spark (Rui Li via Szehon) [Spark Branch]
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1617420&r1=1617419&r2=1617420&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Aug 12 06:24:00 2014
@@ -321,4 +321,8 @@ public class ExecReducer extends MapRedu
Utilities.clearWorkMap();
}
}
+
+ public Operator<?> getReducer() {
+ return reducer;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java?rev=1617420&r1=1617419&r2=1617420&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java Tue Aug 12 06:24:00 2014
@@ -18,12 +18,16 @@
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Reporter;
import scala.Tuple2;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Iterator;
public class HiveReduceFunctionResultList extends
@@ -41,6 +45,7 @@ public class HiveReduceFunctionResultLis
ExecReducer reducer) {
super(conf, inputIterator);
this.reducer = reducer;
+ setOutputCollector();
}
@Override
@@ -58,4 +63,11 @@ public class HiveReduceFunctionResultLis
protected void closeRecordProcessor() {
reducer.close();
}
+
+ private void setOutputCollector() {
+ if (reducer != null && reducer.getReducer() != null) {
+ OperatorUtils.setChildrenCollector(
+ Arrays.<Operator<? extends OperatorDesc>>asList(reducer.getReducer()), this);
+ }
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1617420&r1=1617419&r2=1617420&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Aug 12 06:24:00 2014
@@ -47,7 +47,7 @@ import org.apache.spark.api.java.JavaSpa
public class SparkPlanGenerator {
private JavaSparkContext sc;
- private JobConf jobConf;
+ private final JobConf jobConf;
private Context context;
private Path scratchDir;
@@ -86,7 +86,6 @@ public class SparkPlanGenerator {
private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork) throws Exception {
List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false);
Utilities.setInputPaths(jobConf, inputPaths);
- Utilities.setMapWork(jobConf, mapWork, scratchDir, true);
Class ifClass = HiveInputFormat.class;
// The mapper class is expected by the HiveInputFormat.
@@ -119,7 +118,7 @@ public class SparkPlanGenerator {
private MapTran generate(MapWork mw) throws IOException {
MapTran result = new MapTran();
- Utilities.setMapWork(jobConf, mw, scratchDir, true);
+ Utilities.setMapWork(jobConf, mw, scratchDir, false);
Utilities.createTmpDirs(jobConf, mw);
jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
@@ -137,11 +136,15 @@ public class SparkPlanGenerator {
private ReduceTran generate(ReduceWork rw) throws IOException {
ReduceTran result = new ReduceTran();
- Utilities.setReduceWork(jobConf, rw, scratchDir, true);
- Utilities.createTmpDirs(jobConf, rw);
- byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
- HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes);
- result.setReduceFunction(mapFunc);
+ // Clone jobConf for each ReduceWork so we can have multiple of them
+ JobConf newJobConf = new JobConf(jobConf);
+ // Make sure we'll use a different plan path from the original one
+ HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, "");
+ Utilities.setReduceWork(newJobConf, rw, scratchDir, false);
+ Utilities.createTmpDirs(newJobConf, rw);
+ byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
+ HiveReduceFunction redFunc = new HiveReduceFunction(confBytes);
+ result.setReduceFunction(redFunc);
return result;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1617420&r1=1617419&r2=1617420&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Tue Aug 12 06:24:00 2014
@@ -26,7 +26,7 @@ public class SparkEdgeProperty {
public static long SHUFFLE_SORT = 2; // Shuffle, keys are sorted
private long edgeType;
-
+
private int numPartitions;
public SparkEdgeProperty(long edgeType, int numPartitions) {