You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/08/14 19:53:32 UTC
svn commit: r1618002 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
HiveMapFunction.java HiveMapFunctionResultList.java
SparkMapRecordHandler.java
Author: szehon
Date: Thu Aug 14 17:53:32 2014
New Revision: 1618002
URL: http://svn.apache.org/r1618002
Log:
HIVE-7707 : Optimize SparkMapRecordHandler implementation (Chengxiang Li via Szehon) [Spark Branch]
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1618002&r1=1618001&r2=1618002&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Thu Aug 14 17:53:32 2014
@@ -22,6 +22,7 @@ import java.util.Iterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
@@ -45,10 +46,12 @@ BytesWritable, BytesWritable> {
jobConf = KryoSerializer.deserializeJobConf(this.buffer);
}
- SparkMapRecordHandler mapper = new SparkMapRecordHandler();
- mapper.configure(jobConf);
+ SparkMapRecordHandler mapRecordHandler = new SparkMapRecordHandler();
+ HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler);
+ //TODO we need to implement a Spark specified Reporter to collect stats, refer to HIVE-7709.
+ mapRecordHandler.init(jobConf, result, Reporter.NULL);
- return new HiveMapFunctionResultList(jobConf, it, mapper);
+ return result;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1618002&r1=1618001&r2=1618002&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java Thu Aug 14 17:53:32 2014
@@ -44,7 +44,7 @@ public class HiveMapFunctionResultList e
@Override
protected void processNextRecord(Tuple2<BytesWritable, BytesWritable> inputRecord)
throws IOException {
- recordHandler.map(inputRecord._1(), inputRecord._2(), this, Reporter.NULL);
+ recordHandler.process(inputRecord._2());
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1618002&r1=1618001&r2=1618002&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Thu Aug 14 17:53:32 2014
@@ -77,7 +77,7 @@ public class SparkMapRecordHandler {
private final ExecMapperContext execContext = new ExecMapperContext();
- public void configure(JobConf job) {
+ public void init(JobConf job, OutputCollector output, Reporter reporter) {
// Allocate the bean at the beginning -
memoryMXBean = ManagementFactory.getMemoryMXBean();
l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
@@ -128,6 +128,12 @@ public class SparkMapRecordHandler {
mo.initializeLocalWork(jc);
mo.initialize(jc, null);
+ oc = output;
+ rp = reporter;
+ OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
+ mo.setReporter(rp);
+ MapredContext.get().setReporter(reporter);
+
if (localWork == null) {
return;
}
@@ -152,15 +158,7 @@ public class SparkMapRecordHandler {
}
}
- public void map(Object key, Object value, OutputCollector output,
- Reporter reporter) throws IOException {
- if (oc == null) {
- oc = output;
- rp = reporter;
- OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
- mo.setReporter(rp);
- MapredContext.get().setReporter(reporter);
- }
+ public void process(Object value) throws IOException {
// reset the execContext for each new row
execContext.resetRow();