You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/08/12 08:24:00 UTC

svn commit: r1617420 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/mr/ExecReducer.java exec/spark/HiveReduceFunctionResultList.java exec/spark/SparkPlanGenerator.java plan/SparkEdgeProperty.java

Author: szehon
Date: Tue Aug 12 06:24:00 2014
New Revision: 1617420

URL: http://svn.apache.org/r1617420
Log:
HIVE-7626 : Reduce operator initialization failed when running mulitple MR query on spark (Rui Li via Szehon) [Spark Branch]

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1617420&r1=1617419&r2=1617420&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Aug 12 06:24:00 2014
@@ -321,4 +321,8 @@ public class ExecReducer extends MapRedu
       Utilities.clearWorkMap();
     }
   }
+
+  public Operator<?> getReducer() {
+    return reducer;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java?rev=1617420&r1=1617419&r2=1617420&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java Tue Aug 12 06:24:00 2014
@@ -18,12 +18,16 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.Reporter;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Iterator;
 
 public class HiveReduceFunctionResultList extends
@@ -41,6 +45,7 @@ public class HiveReduceFunctionResultLis
       ExecReducer reducer) {
     super(conf, inputIterator);
     this.reducer = reducer;
+    setOutputCollector();
   }
 
   @Override
@@ -58,4 +63,11 @@ public class HiveReduceFunctionResultLis
   protected void closeRecordProcessor() {
     reducer.close();
   }
+
+  private void setOutputCollector() {
+    if (reducer != null && reducer.getReducer() != null) {
+      OperatorUtils.setChildrenCollector(
+          Arrays.<Operator<? extends OperatorDesc>>asList(reducer.getReducer()), this);
+    }
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1617420&r1=1617419&r2=1617420&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Aug 12 06:24:00 2014
@@ -47,7 +47,7 @@ import org.apache.spark.api.java.JavaSpa
 
 public class SparkPlanGenerator {
   private JavaSparkContext sc;
-  private JobConf jobConf;
+  private final JobConf jobConf;
   private Context context;
   private Path scratchDir;
 
@@ -86,7 +86,6 @@ public class SparkPlanGenerator {
   private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork) throws Exception {
     List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false);
     Utilities.setInputPaths(jobConf, inputPaths);
-    Utilities.setMapWork(jobConf, mapWork, scratchDir, true);
     Class ifClass = HiveInputFormat.class;
 
     // The mapper class is expected by the HiveInputFormat.
@@ -119,7 +118,7 @@ public class SparkPlanGenerator {
 
   private MapTran generate(MapWork mw) throws IOException {
     MapTran result = new MapTran();
-    Utilities.setMapWork(jobConf, mw, scratchDir, true);
+    Utilities.setMapWork(jobConf, mw, scratchDir, false);
     Utilities.createTmpDirs(jobConf, mw);
     jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
     byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
@@ -137,11 +136,15 @@ public class SparkPlanGenerator {
 
   private ReduceTran generate(ReduceWork rw) throws IOException {
     ReduceTran result = new ReduceTran();
-    Utilities.setReduceWork(jobConf, rw, scratchDir, true);
-    Utilities.createTmpDirs(jobConf, rw);
-    byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
-    HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes);
-    result.setReduceFunction(mapFunc);
+    // Clone jobConf for each ReduceWork so we can have multiple of them
+    JobConf newJobConf = new JobConf(jobConf);
+    // Make sure we'll use a different plan path from the original one
+    HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, "");
+    Utilities.setReduceWork(newJobConf, rw, scratchDir, false);
+    Utilities.createTmpDirs(newJobConf, rw);
+    byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
+    HiveReduceFunction redFunc = new HiveReduceFunction(confBytes);
+    result.setReduceFunction(redFunc);
     return result;
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java?rev=1617420&r1=1617419&r2=1617420&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java Tue Aug 12 06:24:00 2014
@@ -26,7 +26,7 @@ public class SparkEdgeProperty {
   public static long SHUFFLE_SORT = 2;  // Shuffle, keys are sorted
 
   private long edgeType;
-  
+
   private int numPartitions;
 
   public SparkEdgeProperty(long edgeType, int numPartitions) {