You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2010/08/23 23:32:55 UTC
svn commit: r988318 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java
Author: namit
Date: Mon Aug 23 21:32:55 2010
New Revision: 988318
URL: http://svn.apache.org/viewvc?rev=988318&view=rev
Log:
HIVE-1581. CompactIndexInputFormat should create split only for files in the index output file
(He Yongqiang via namit)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=988318&r1=988317&r2=988318&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug 23 21:32:55 2010
@@ -96,6 +96,9 @@ Trunk - Unreleased
versions 0.20.4 0.20.5 and cloudera CDH3 version
(Basab Maulik via jvs)
+ HIVE-1581. CompactIndexInputFormat should create split only for files in the index output file
+ (He Yongqiang via namit)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java?rev=988318&r1=988317&r2=988318&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/index/compact/HiveCompactIndexInputFormat.java Mon Aug 23 21:32:55 2010
@@ -2,13 +2,22 @@ package org.apache.hadoop.hive.ql.index.
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.IOPrepareCache;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -20,25 +29,74 @@ public class HiveCompactIndexInputFormat
super();
}
+ public InputSplit[] doGetSplits(JobConf job, int numSplits) throws IOException {
+
+ super.init(job);
+
+ Path[] dirs = FileInputFormat.getInputPaths(job);
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+ JobConf newjob = new JobConf(job);
+ ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+
+ // for each dir, get the InputFormat, and do getSplits.
+ for (Path dir : dirs) {
+ PartitionDesc part = HiveFileFormatUtils
+ .getPartitionDescFromPathRecursively(pathToPartitionInfo, dir,
+ IOPrepareCache.get().allocatePartitionDescMap());
+ // create a new InputFormat instance if this is the first time to see this
+ // class
+ Class inputFormatClass = part.getInputFileFormatClass();
+ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+ Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob);
+
+ FileInputFormat.setInputPaths(newjob, dir);
+ newjob.setInputFormat(inputFormat.getClass());
+ InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
+ for (InputSplit is : iss) {
+ result.add(new HiveInputSplit(is, inputFormatClass.getName()));
+ }
+ }
+ return result.toArray(new HiveInputSplit[result.size()]);
+ }
+
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
String indexFileStr = job.get("hive.index.compact.file");
l4j.info("index_file is " + indexFileStr);
- HiveInputSplit[] splits = (HiveInputSplit[]) super
- .getSplits(job, numSplits);
-
- if (indexFileStr == null) {
- return splits;
- }
HiveCompactIndexResult hiveIndexResult = null;
- try {
- hiveIndexResult = new HiveCompactIndexResult(indexFileStr, job);
- } catch (HiveException e) {
- // there is
- l4j.error("Unable to read index so we will go with all the file splits.");
- e.printStackTrace();
+ if (indexFileStr != null) {
+ try {
+ hiveIndexResult = new HiveCompactIndexResult(indexFileStr, job);
+ } catch (HiveException e) {
+ l4j.error("Unable to read index..");
+ throw new IOException(e);
+ }
+
+ Set<String> inputFiles = hiveIndexResult.buckets.keySet();
+ Iterator<String> iter = inputFiles.iterator();
+ boolean first = true;
+ StringBuilder newInputPaths = new StringBuilder();
+ while(iter.hasNext()) {
+ String path = iter.next();
+ if (path.trim().equalsIgnoreCase(""))
+ continue;
+ if (!first) {
+ newInputPaths.append(",");
+ } else {
+ first = false;
+ }
+ newInputPaths.append(path);
+ }
+
+ FileInputFormat.setInputPaths(job, newInputPaths.toString());
+ } else {
+ return super.getSplits(job, numSplits);
}
+
+ HiveInputSplit[] splits = (HiveInputSplit[]) this.doGetSplits(job, numSplits);
ArrayList<HiveInputSplit> newSplits = new ArrayList<HiveInputSplit>(
numSplits);