You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/19 23:01:57 UTC

svn commit: r1618986 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/HiveReduceFunction.java exec/spark/SparkPlanGenerator.java io/IOContext.java

Author: brock
Date: Tue Aug 19 21:01:57 2014
New Revision: 1618986

URL: http://svn.apache.org/r1618986
Log:
HIVE-7773 - Union all query finished with errors [Spark Branch] (Rui Li via Brock)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1618986&r1=1618985&r2=1618986&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Tue Aug 19 21:01:57 2014
@@ -45,7 +45,6 @@ BytesWritable, BytesWritable> {
   call(Iterator<Tuple2<BytesWritable,Iterable<BytesWritable>>> it) throws Exception {
     if (jobConf == null) {
       jobConf = KryoSerializer.deserializeJobConf(this.buffer);
-      jobConf.set("mapred.reducer.class", ExecReducer.class.getName());
     }
 
     SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1618986&r1=1618985&r2=1618986&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Aug 19 21:01:57 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -80,8 +81,9 @@ public class SparkPlanGenerator {
             "The roots in the SparkWork must be MapWork instances!");
       }
       MapWork mapWork = (MapWork) w;
-      SparkTran tran = generate(w);
-      JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork);
+      JobConf newJobConf = cloneJobConf(mapWork);
+      SparkTran tran = generate(newJobConf, mapWork);
+      JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(newJobConf, mapWork);
       trans.addTranWithInput(tran, input);
 
       while (sparkWork.getChildren(w).size() > 0) {
@@ -116,18 +118,11 @@ public class SparkPlanGenerator {
     return plan;
   }
 
-  private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork)
+  private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(JobConf jobConf, MapWork mapWork)
       throws Exception {
-    JobConf newJobConf = new JobConf(jobConf);
-    List<Path> inputPaths = Utilities.getInputPaths(newJobConf, mapWork,
-        scratchDir, context, false);
-    Utilities.setInputPaths(newJobConf, inputPaths);
-    Utilities.setMapWork(newJobConf, mapWork, scratchDir, true);
     Class ifClass = getInputFormat(mapWork);
 
-    // The mapper class is expected by the HiveInputFormat.
-    newJobConf.set("mapred.mapper.class", ExecMapper.class.getName());
-    return sc.hadoopRDD(newJobConf, ifClass, WritableComparable.class,
+    return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
         Writable.class);
   }
 
@@ -159,43 +154,10 @@ public class SparkPlanGenerator {
     return inputFormatClass;
   }
 
-  private SparkTran generate(BaseWork bw) throws Exception {
-    // initialize stats publisher if necessary
-    if (bw.isGatheringStats()) {
-      StatsPublisher statsPublisher;
-      StatsFactory factory = StatsFactory.newFactory(jobConf);
-      if (factory != null) {
-        statsPublisher = factory.getStatsPublisher();
-        if (!statsPublisher.init(jobConf)) { // creating stats table if not exists
-          if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
-            throw new HiveException(
-                ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
-          }
-        }
-      }
-    }
-    if (bw instanceof MapWork) {
-      return generate((MapWork) bw);
-    } else if (bw instanceof ReduceWork) {
-      return generate((ReduceWork) bw);
-    } else {
-      throw new IllegalArgumentException(
-          "Only MapWork and ReduceWork are expected");
-    }
-  }
-
-  private MapTran generate(MapWork mw) throws Exception {
-    JobConf newJobConf = new JobConf(jobConf);
+  private MapTran generate(JobConf jobConf, MapWork mw) throws Exception {
+    initStatsPublisher(mw);
     MapTran result = new MapTran();
-
-    List<Path> inputPaths = Utilities.getInputPaths(newJobConf, mw,
-      scratchDir, context, false);
-    Utilities.setInputPaths(newJobConf, inputPaths);
-
-    Utilities.setMapWork(newJobConf, mw, scratchDir, true);
-    Utilities.createTmpDirs(newJobConf, mw);
-    newJobConf.set("mapred.mapper.class", ExecMapper.class.getName());
-    byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
+    byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
     HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
     result.setMapFunction(mapFunc);
     return result;
@@ -210,14 +172,9 @@ public class SparkPlanGenerator {
     return new GroupByShuffler();
   }
 
-  private ReduceTran generate(ReduceWork rw) throws IOException {
+  private ReduceTran generate(ReduceWork rw) throws Exception {
     ReduceTran result = new ReduceTran();
-    // Clone jobConf for each ReduceWork so we can have multiple of them
-    JobConf newJobConf = new JobConf(jobConf);
-    // Make sure we'll use a different plan path from the original one
-    HiveConf.setVar(newJobConf, HiveConf.ConfVars.PLAN, "");
-    Utilities.setReduceWork(newJobConf, rw, scratchDir, false);
-    Utilities.createTmpDirs(newJobConf, rw);
+    JobConf newJobConf = cloneJobConf(rw);
     byte[] confBytes = KryoSerializer.serializeJobConf(newJobConf);
     HiveReduceFunction redFunc = new HiveReduceFunction(confBytes);
     result.setReduceFunction(redFunc);
@@ -229,4 +186,39 @@ public class SparkPlanGenerator {
     return result;
   }
 
+  private JobConf cloneJobConf(BaseWork work) throws Exception {
+    JobConf cloned = new JobConf(jobConf);
+    // Make sure we'll use a different plan path from the original one
+    HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, "");
+    if (work instanceof MapWork) {
+      List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, scratchDir, context, false);
+      Utilities.setInputPaths(cloned, inputPaths);
+      Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false);
+      Utilities.createTmpDirs(cloned, (MapWork) work);
+      cloned.set("mapred.mapper.class", ExecMapper.class.getName());
+    } else if (work instanceof ReduceWork) {
+      Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false);
+      Utilities.createTmpDirs(cloned, (ReduceWork) work);
+      cloned.set("mapred.reducer.class", ExecReducer.class.getName());
+    }
+    return cloned;
+  }
+
+  private void initStatsPublisher(BaseWork work) throws HiveException {
+    // initialize stats publisher if necessary
+    if (work.isGatheringStats()) {
+      StatsPublisher statsPublisher;
+      StatsFactory factory = StatsFactory.newFactory(jobConf);
+      if (factory != null) {
+        statsPublisher = factory.getStatsPublisher();
+        if (!statsPublisher.init(jobConf)) { // creating stats table if not exists
+          if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+            throw new HiveException(
+                ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+          }
+        }
+      }
+    }
+  }
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1618986&r1=1618985&r2=1618986&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Tue Aug 19 21:01:57 2014
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.io;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.session.SessionState;
 
 
 /**
@@ -38,19 +37,12 @@ public class IOContext {
     protected synchronized IOContext initialValue() { return new IOContext(); }
  };
 
- private static IOContext ioContext = new IOContext();
-
   public static IOContext get() {
-    if (SessionState.get() == null) {
-      // this happens on the backend. only one io context needed.
-      return ioContext;
-    }
     return IOContext.threadLocal.get();
   }
 
   public static void clear() {
     IOContext.threadLocal.remove();
-    ioContext = new IOContext();
   }
 
   long currentBlockStart;