You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/08/23 23:32:55 UTC

svn commit: r988318 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java

Author: namit
Date: Mon Aug 23 21:32:55 2010
New Revision: 988318

URL: http://svn.apache.org/viewvc?rev=988318&view=rev
Log:
HIVE-1581. CompactIndexInputFormat should create split only for files in the index output file
(He Yongqiang via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=988318&r1=988317&r2=988318&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug 23 21:32:55 2010
@@ -96,6 +96,9 @@ Trunk -  Unreleased
     versions 0.20.4 0.20.5 and cloudera CDH3 version
     (Basab Maulik via jvs)
 
+    HIVE-1581. CompactIndexInputFormat should create split only for files in the index output file
+    (He Yongqiang via namit)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java?rev=988318&r1=988317&r2=988318&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java Mon Aug 23 21:32:55 2010
@@ -2,13 +2,22 @@ package org.apache.hadoop.hive.ql.index.
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.IOPrepareCache;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -20,25 +29,74 @@ public class HiveCompactIndexInputFormat
     super();
   }
 
+  public InputSplit[] doGetSplits(JobConf job, int numSplits) throws IOException {
+
+    super.init(job);
+
+    Path[] dirs = FileInputFormat.getInputPaths(job);
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+    JobConf newjob = new JobConf(job);
+    ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+    // for each dir, get the InputFormat, and do getSplits.
+    for (Path dir : dirs) {
+      PartitionDesc part = HiveFileFormatUtils
+          .getPartitionDescFromPathRecursively(pathToPartitionInfo, dir,
+              IOPrepareCache.get().allocatePartitionDescMap());
+      // create a new InputFormat instance if this is the first time to see this
+      // class
+      Class inputFormatClass = part.getInputFileFormatClass();
+      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+      Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob);
+
+      FileInputFormat.setInputPaths(newjob, dir);
+      newjob.setInputFormat(inputFormat.getClass());
+      InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
+      for (InputSplit is : iss) {
+        result.add(new HiveInputSplit(is, inputFormatClass.getName()));
+      }
+    }
+    return result.toArray(new HiveInputSplit[result.size()]);
+  }
+  
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     String indexFileStr = job.get("hive.index.compact.file");
     l4j.info("index_file is " + indexFileStr);
-    HiveInputSplit[] splits = (HiveInputSplit[]) super
-    .getSplits(job, numSplits);
-
-    if (indexFileStr == null) {
-      return splits;
-    }
 
     HiveCompactIndexResult hiveIndexResult = null;
-    try {
-      hiveIndexResult = new HiveCompactIndexResult(indexFileStr, job);
-    } catch (HiveException e) {
-      // there is
-      l4j.error("Unable to read index so we will go with all the file splits.");
-      e.printStackTrace();
+    if (indexFileStr != null) {
+      try {
+        hiveIndexResult = new HiveCompactIndexResult(indexFileStr, job);
+      } catch (HiveException e) {
+        l4j.error("Unable to read index..");
+        throw new IOException(e);
+      }
+
+      Set<String> inputFiles = hiveIndexResult.buckets.keySet();
+      Iterator<String> iter = inputFiles.iterator();
+      boolean first = true;
+      StringBuilder newInputPaths = new StringBuilder();
+      while(iter.hasNext()) {
+        String path = iter.next();
+        if (path.trim().equalsIgnoreCase(""))
+          continue;
+        if (!first) {
+          newInputPaths.append(",");
+        } else {
+          first = false;
+        }
+        newInputPaths.append(path);
+      }
+
+      FileInputFormat.setInputPaths(job, newInputPaths.toString());
+    } else {
+      return super.getSplits(job, numSplits);
     }
+    
+    HiveInputSplit[] splits = (HiveInputSplit[]) this.doGetSplits(job, numSplits);
 
     ArrayList<HiveInputSplit> newSplits = new ArrayList<HiveInputSplit>(
         numSplits);