You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/12 17:06:36 UTC

svn commit: r1617501 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java

Author: brock
Date: Tue Aug 12 15:06:35 2014
New Revision: 1617501

URL: http://svn.apache.org/r1617501
Log:
HIVE-7642 - Set hive input format by configuration.[Spark Branch] (Chengxiang Li via Brock)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1617501&r1=1617500&r2=1617501&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Aug 12 15:06:35 2014
@@ -23,13 +23,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -38,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.Sp
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -46,6 +51,8 @@ import org.apache.spark.api.java.JavaPai
 import org.apache.spark.api.java.JavaSparkContext;
 
 public class SparkPlanGenerator {
+  private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
+
   private JavaSparkContext sc;
   private final JobConf jobConf;
   private Context context;
@@ -86,13 +93,39 @@ public class SparkPlanGenerator {
   private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork mapWork) throws Exception {
     List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork, scratchDir, context, false);
     Utilities.setInputPaths(jobConf, inputPaths);
-    Class ifClass = HiveInputFormat.class;
+    Utilities.setMapWork(jobConf, mapWork, scratchDir, true);
+    Class ifClass = getInputFormat(mapWork);
 
     // The mapper class is expected by the HiveInputFormat.
     jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
     return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class);
   }
 
+  private Class getInputFormat(MapWork mWork) throws HiveException {
+    if (mWork.getInputformat() != null) {
+      HiveConf.setVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+    }
+    String inpFormat = HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT);
+    if ((inpFormat == null) || (StringUtils.isBlank(inpFormat))) {
+      inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
+    }
+
+    if (mWork.isUseBucketizedHiveInputFormat()) {
+      inpFormat = BucketizedHiveInputFormat.class.getName();
+    }
+
+    Class inputFormatClass;
+    try {
+      inputFormatClass = Class.forName(inpFormat);
+    } catch (ClassNotFoundException e) {
+      String message = "Failed to load specified input format class:" + inpFormat;
+      LOG.error(message, e);
+      throw new HiveException(message, e);
+    }
+
+    return inputFormatClass;
+  }
+
   private SparkTran generate(BaseWork bw) throws IOException, HiveException {
     // initialize stats publisher if necessary
     if (bw.isGatheringStats()) {