You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/04/07 02:16:16 UTC

svn commit: r931375 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/plan/

Author: zshao
Date: Wed Apr  7 00:16:16 2010
New Revision: 931375

URL: http://svn.apache.org/viewvc?rev=931375&view=rev
Log:
HIVE-1280. Add option to CombineHiveInputFormat for non-splittable inputs. (Namit Jain via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Apr  7 00:16:16 2010
@@ -345,6 +345,9 @@ Trunk -  Unreleased
     HIVE-1290. Sort merge join does not work with bucketizedhiveinputformat.
     (Namit Jain via Ning Zhang)
 
+    HIVE-1280. Add option to CombineHiveInputFormat for non-splittable inputs.
+    (Namit Jain via zshao)
+
 Release 0.5.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Apr  7 00:16:16 2010
@@ -219,16 +219,18 @@ public class HiveConf extends Configurat
     HIVEENFORCEBUCKETING("hive.enforce.bucketing", false),
     HIVEENFORCESORTING("hive.enforce.sorting", false),
     HIVEPARTITIONER("hive.mapred.partitioner", "org.apache.hadoop.hive.ql.io.DefaultHivePartitioner"),
-    
+
     HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
 
+    HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE("hive.hadoop.supports.splittable.combineinputformat", false),
+
     // Optimizer
     HIVEOPTCP("hive.optimize.cp", true), // column pruner
     HIVEOPTPPD("hive.optimize.ppd", true), // predicate pushdown
     HIVEOPTGROUPBY("hive.optimize.groupby", true), // optimize group by
     HIVEOPTBUCKETMAPJOIN("hive.optimize.bucketmapjoin", false), // optimize bucket map join
     HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
-    HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true), 
+    HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
     ;
 
     public final String varname;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Wed Apr  7 00:16:16 2010
@@ -26,6 +26,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Map;
+import java.util.Queue;
+import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +51,6 @@ import org.apache.hadoop.mapred.RecordRe
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
 
-
 /**
  * CombineHiveInputFormat is a parameterized InputFormat which looks at the path
  * name and determine the correct InputFormat for that path name from
@@ -239,37 +240,50 @@ public class CombineHiveInputFormat<K ex
     for (Path path : paths) {
       LOG.info("CombineHiveInputSplit creating pool for " + path);
 
-      // The following code should be removed, once
-      // https://issues.apache.org/jira/browse/MAPREDUCE-1597 is fixed.
-      // Hadoop does not handle non-splitable files correctly for CombineFileInputFormat,
-      // so don't use CombineFileInputFormat for non-splittable files
-      FileSystem inpFs = path.getFileSystem(job);
-
-      FileStatus fStats = inpFs.getFileStatus(path);
-      Path tstPath = path;
-
-      // If path is a directory
-      if (fStats.isDir()) {
-        FileStatus[] fStatus = inpFs.listStatus(path);
-        if (fStatus.length > 0) {
-          tstPath = fStatus[0].getPath();
-        }
-      }
-
       PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, path);
+      TableDesc tableDesc = part.getTableDesc();
+      if ((tableDesc != null) && tableDesc.isNonNative()) {
+        return super.getSplits(job, numSplits);
+      }
 
       // Use HiveInputFormat if any of the paths is not splittable
       Class inputFormatClass = part.getInputFileFormatClass();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
 
-      TableDesc tableDesc = part.getTableDesc();
-      if ((tableDesc != null) && tableDesc.isNonNative()) {
-        return super.getSplits(job, numSplits);
-      }
+      // Since there is no easy way of knowing whether MAPREDUCE-1597 is present in the tree or not,
+      // we use a configuration variable for the same
+      if (this.mrwork != null && this.mrwork.getHadoopSupportsSplittable()) {
+        // The following code should be removed, once
+        // https://issues.apache.org/jira/browse/MAPREDUCE-1597 is fixed.
+        // Hadoop does not handle non-splittable files correctly for CombineFileInputFormat,
+        // so don't use CombineFileInputFormat for non-splittable files
+        FileSystem inpFs = path.getFileSystem(job);
+
+        if (inputFormat instanceof TextInputFormat) {
+          Queue<Path> dirs = new LinkedList<Path>();
+          FileStatus fStats = inpFs.getFileStatus(path);
+
+          // If path is a directory
+          if (fStats.isDir()) {
+            dirs.offer(path);
+          }
+          else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
+            return super.getSplits(job, numSplits);
+          }
 
-      if ((inputFormat instanceof TextInputFormat) &&
-          ((new CompressionCodecFactory(job)).getCodec(tstPath) != null)) {
-        return super.getSplits(job, numSplits);
+          while (dirs.peek() != null) {
+            Path tstPath = dirs.remove();
+            FileStatus[] fStatus = inpFs.listStatus(tstPath);
+            for (int idx = 0; idx < fStatus.length; idx++) {
+              if (fStatus[idx].isDir()) {
+                dirs.offer(fStatus[idx].getPath());
+              }
+              else if ((new CompressionCodecFactory(job)).getCodec(fStatus[idx].getPath()) != null) {
+                return super.getSplits(job, numSplits);
+              }
+            }
+          }
+        }
       }
 
       if (inputFormat instanceof SymlinkTextInputFormat) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Wed Apr  7 00:16:16 2010
@@ -77,7 +77,7 @@ public class GenMRFileSink1 implements N
 
   /**
    * File Sink Operator encountered.
-   * 
+   *
    * @param nd
    *          the file sink operator encountered
    * @param opProcCtx
@@ -158,8 +158,8 @@ public class GenMRFileSink1 implements N
         new ArrayList<ExprNodeDesc>(), valueCols, outputColumns, false, -1, -1,
         -1);
     OperatorFactory.getAndMakeChild(rsDesc, fsRS, ts_op);
-    MapredWork cplan = GenMapRedUtils.getMapRedWork();
     ParseContext parseCtx = ctx.getParseCtx();
+    MapredWork cplan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
 
     Task<? extends Serializable> mergeTask = TaskFactory.get(cplan, parseCtx
         .getConf());

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java Wed Apr  7 00:16:16 2010
@@ -42,7 +42,7 @@ public class GenMRTableScan1 implements 
 
   /**
    * Table Sink encountered.
-   * 
+   *
    * @param nd
    *          the table sink operator encountered
    * @param opProcCtx
@@ -57,8 +57,9 @@ public class GenMRTableScan1 implements 
         .getMapCurrCtx();
 
     // create a dummy task
-    Task<? extends Serializable> currTask = TaskFactory.get(GenMapRedUtils
-        .getMapRedWork(), parseCtx.getConf());
+    Task<? extends Serializable> currTask =
+      TaskFactory.get(GenMapRedUtils.getMapRedWork(parseCtx.getConf()),
+                      parseCtx.getConf());
     Operator<? extends Serializable> currTopOp = op;
     ctx.setCurrTask(currTask);
     ctx.setCurrTopOp(currTopOp);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Wed Apr  7 00:16:16 2010
@@ -65,9 +65,9 @@ public class GenMRUnion1 implements Node
    * all the sub-queries are map-only, dont do anything. However, if there is a
    * mapjoin followed by the union, merge at the union Otherwise, insert a
    * FileSink on top of all the sub-queries.
-   * 
+   *
    * This can be optimized later on.
-   * 
+   *
    * @param nd
    *          the file sink operator encountered
    * @param opProcCtx
@@ -132,7 +132,7 @@ public class GenMRUnion1 implements Node
     // union is encountered for the first time
     if (uCtxTask == null) {
       uCtxTask = new GenMRUnionCtx();
-      uPlan = GenMapRedUtils.getMapRedWork();
+      uPlan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
       uTask = TaskFactory.get(uPlan, parseCtx.getConf());
       uCtxTask.setUTask(uTask);
       ctx.setUnionTask(union, uCtxTask);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Apr  7 00:16:16 2010
@@ -471,8 +471,8 @@ public final class GenMapRedUtils {
   public static void splitPlan(ReduceSinkOperator op, GenMRProcContext opProcCtx)
       throws SemanticException {
     // Generate a new task
-    MapredWork cplan = getMapRedWork();
     ParseContext parseCtx = opProcCtx.getParseCtx();
+    MapredWork cplan = getMapRedWork(parseCtx.getConf());
     Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
         .getConf());
     Operator<? extends Serializable> reducer = op.getChildOperators().get(0);
@@ -736,14 +736,15 @@ public final class GenMapRedUtils {
    *
    * @return the new plan
    */
-  public static MapredWork getMapRedWork() {
+  public static MapredWork getMapRedWork(HiveConf conf) {
     MapredWork work = new MapredWork();
     work.setPathToAliases(new LinkedHashMap<String, ArrayList<String>>());
     work.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
-    work
-        .setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
+    work.setAliasToWork(new LinkedHashMap<String, Operator<? extends Serializable>>());
     work.setTagToValueDesc(new ArrayList<TableDesc>());
     work.setReducer(null);
+    work.setHadoopSupportsSplittable(
+      conf.getBoolVar(HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE));
     return work;
   }
 
@@ -913,7 +914,7 @@ public final class GenMapRedUtils {
     // union is encountered for the first time
     if (uCtxTask == null) {
       uCtxTask = new GenMRUnionCtx();
-      uPlan = GenMapRedUtils.getMapRedWork();
+      uPlan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
       uTask = TaskFactory.get(uPlan, parseCtx.getConf());
       uCtxTask.setUTask(uTask);
       ctx.setUnionTask(union, uCtxTask);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Wed Apr  7 00:16:16 2010
@@ -126,8 +126,8 @@ public final class MapJoinFactory {
       AbstractMapJoinOperator<MapJoinDesc> mapJoin = (AbstractMapJoinOperator<MapJoinDesc>) nd;
       GenMRProcContext opProcCtx = (GenMRProcContext) procCtx;
 
-      MapredWork cplan = GenMapRedUtils.getMapRedWork();
       ParseContext parseCtx = opProcCtx.getParseCtx();
+      MapredWork cplan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
       Task<? extends Serializable> redTask = TaskFactory.get(cplan, parseCtx
           .getConf());
       Task<? extends Serializable> currTask = opProcCtx.getCurrTask();
@@ -211,7 +211,7 @@ public final class MapJoinFactory {
         ctx.setMapJoinCtx(mapJoin, mjCtx);
       }
 
-      MapredWork mjPlan = GenMapRedUtils.getMapRedWork();
+      MapredWork mjPlan = GenMapRedUtils.getMapRedWork(parseCtx.getConf());
       Task<? extends Serializable> mjTask = TaskFactory.get(mjPlan, parseCtx
           .getConf());
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Wed Apr  7 00:16:16 2010
@@ -61,6 +61,8 @@ public class MapredWork implements Seria
   private Integer minSplitSize;
 
   private boolean needsTagging;
+  private boolean hadoopSupportsSplittable;
+
   private MapredLocalWork mapLocalWork;
   private String inputformat;
 
@@ -75,7 +77,8 @@ public class MapredWork implements Seria
       final LinkedHashMap<String, Operator<? extends Serializable>> aliasToWork,
       final TableDesc keyDesc, List<TableDesc> tagToValueDesc,
       final Operator<?> reducer, final Integer numReduceTasks,
-      final MapredLocalWork mapLocalWork) {
+      final MapredLocalWork mapLocalWork,
+      final boolean hadoopSupportsSplittable) {
     this.command = command;
     this.pathToAliases = pathToAliases;
     this.pathToPartitionInfo = pathToPartitionInfo;
@@ -86,6 +89,7 @@ public class MapredWork implements Seria
     this.numReduceTasks = numReduceTasks;
     this.mapLocalWork = mapLocalWork;
     aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
+    this.hadoopSupportsSplittable = hadoopSupportsSplittable;
   }
 
   public String getCommand() {
@@ -194,7 +198,7 @@ public class MapredWork implements Seria
   /**
    * If the number of reducers is -1, the runtime will automatically figure it
    * out by input data size.
-   * 
+   *
    * The number of reducers will be a positive number only in case the target
    * table is bucketed into N buckets (through CREATE TABLE). This feature is
    * not supported yet, so the number of reducers will always be -1 for now.
@@ -294,6 +298,14 @@ public class MapredWork implements Seria
     this.needsTagging = needsTagging;
   }
 
+  public boolean getHadoopSupportsSplittable() {
+    return hadoopSupportsSplittable;
+  }
+
+  public void setHadoopSupportsSplittable(boolean hadoopSupportsSplittable) {
+    this.hadoopSupportsSplittable = hadoopSupportsSplittable;
+  }
+
   public Integer getMinSplitSize() {
     return minSplitSize;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=931375&r1=931374&r2=931375&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Wed Apr  7 00:16:16 2010
@@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.InputFor
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.hive.conf.HiveConf;
 
 /**
  * PlanUtils.
@@ -72,11 +73,16 @@ public final class PlanUtils {
 
   @SuppressWarnings("nls")
   public static MapredWork getMapRedWork() {
-    return new MapredWork("", new LinkedHashMap<String, ArrayList<String>>(),
+    try {
+      return new MapredWork("", new LinkedHashMap<String, ArrayList<String>>(),
         new LinkedHashMap<String, PartitionDesc>(),
         new LinkedHashMap<String, Operator<? extends Serializable>>(),
         new TableDesc(), new ArrayList<TableDesc>(), null, Integer.valueOf(1),
-        null);
+        null, Hive.get().getConf().getBoolVar(
+          HiveConf.ConfVars.HIVE_COMBINE_INPUT_FORMAT_SUPPORTS_SPLITTABLE));
+    } catch (HiveException ex) {
+      throw new RuntimeException(ex);
+    }
   }
 
   /**
@@ -389,7 +395,7 @@ public final class PlanUtils {
 
   /**
    * Create the reduce sink descriptor.
-   * 
+   *
    * @param keyCols
    *          The columns to be stored in the key
    * @param valueCols
@@ -441,7 +447,7 @@ public final class PlanUtils {
 
   /**
    * Create the reduce sink descriptor.
-   * 
+   *
    * @param keyCols
    *          The columns to be stored in the key
    * @param valueCols