You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/08/14 19:53:32 UTC

svn commit: r1618002 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: HiveMapFunction.java HiveMapFunctionResultList.java SparkMapRecordHandler.java

Author: szehon
Date: Thu Aug 14 17:53:32 2014
New Revision: 1618002

URL: http://svn.apache.org/r1618002
Log:
HIVE-7707 : Optimize SparkMapRecordHandler implementation (Chengxiang Li via Szehon) [Spark Branch]

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1618002&r1=1618001&r2=1618002&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Thu Aug 14 17:53:32 2014
@@ -22,6 +22,7 @@ import java.util.Iterator;
 
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
 import scala.Tuple2;
@@ -45,10 +46,12 @@ BytesWritable, BytesWritable> {
       jobConf = KryoSerializer.deserializeJobConf(this.buffer);
     }
 
-    SparkMapRecordHandler mapper = new SparkMapRecordHandler();
-    mapper.configure(jobConf);
+    SparkMapRecordHandler mapRecordHandler = new SparkMapRecordHandler();
+    HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler);
+    //TODO we need to implement a Spark specified Reporter to collect stats, refer to HIVE-7709.
+    mapRecordHandler.init(jobConf, result, Reporter.NULL);
 
-    return new HiveMapFunctionResultList(jobConf, it, mapper);
+    return result;
   }
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1618002&r1=1618001&r2=1618002&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java Thu Aug 14 17:53:32 2014
@@ -44,7 +44,7 @@ public class HiveMapFunctionResultList e
   @Override
   protected void processNextRecord(Tuple2<BytesWritable, BytesWritable> inputRecord)
       throws IOException {
-    recordHandler.map(inputRecord._1(), inputRecord._2(), this, Reporter.NULL);
+    recordHandler.process(inputRecord._2());
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1618002&r1=1618001&r2=1618002&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Thu Aug 14 17:53:32 2014
@@ -77,7 +77,7 @@ public class SparkMapRecordHandler {
 
   private final ExecMapperContext execContext = new ExecMapperContext();
 
-  public void configure(JobConf job) {
+  public void init(JobConf job, OutputCollector output, Reporter reporter) {
     // Allocate the bean at the beginning -
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
@@ -128,6 +128,12 @@ public class SparkMapRecordHandler {
       mo.initializeLocalWork(jc);
       mo.initialize(jc, null);
 
+      oc = output;
+      rp = reporter;
+      OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
+      mo.setReporter(rp);
+      MapredContext.get().setReporter(reporter);
+
       if (localWork == null) {
         return;
       }
@@ -152,15 +158,7 @@ public class SparkMapRecordHandler {
     }
   }
 
-  public void map(Object key, Object value, OutputCollector output,
-    Reporter reporter) throws IOException {
-    if (oc == null) {
-      oc = output;
-      rp = reporter;
-      OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
-      mo.setReporter(rp);
-      MapredContext.get().setReporter(reporter);
-    }
+  public void process(Object value) throws IOException {
     // reset the execContext for each new row
     execContext.resetRow();