You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/18 04:13:11 UTC
svn commit: r1533311 -
/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
Author: gunther
Date: Fri Oct 18 02:13:11 2013
New Revision: 1533311
URL: http://svn.apache.org/r1533311
Log:
HIVE-5586: Call createSplits with multiple paths if partition specs match on Tez (Gunther Hagleitner)
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1533311&r1=1533310&r2=1533311&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Fri Oct 18 02:13:11 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -252,6 +254,22 @@ public class HiveInputFormat<K extends W
pathToPartitionInfo = mrwork.getPathToPartitionInfo();
}
+ private void addSplitsForGroup(List<Path> dirs, TableScanOperator tableScan, JobConf conf,
+ InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
+ TableDesc table, List<InputSplit> result) throws IOException {
+
+ Utilities.copyTableJobPropertiesToConf(table, conf);
+
+ pushFilters(conf, tableScan);
+
+ FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()]));
+ conf.setInputFormat(inputFormat.getClass());
+ InputSplit[] iss = inputFormat.getSplits(conf, splits);
+ for (InputSplit is : iss) {
+ result.add(new HiveInputSplit(is, inputFormatClass.getName()));
+ }
+ }
+
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
@@ -262,36 +280,60 @@ public class HiveInputFormat<K extends W
throw new IOException("No input paths specified in job");
}
JobConf newjob = new JobConf(job);
- ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+ List<InputSplit> result = new ArrayList<InputSplit>();
+
+ List<Path> currentDirs = new ArrayList<Path>();
+ Class<? extends InputFormat> currentInputFormatClass = null;
+ TableDesc currentTable = null;
+ TableScanOperator currentTableScan = null;
// for each dir, get the InputFormat, and do getSplits.
for (Path dir : dirs) {
PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
- // create a new InputFormat instance if this is the first time to see this
- // class
- Class inputFormatClass = part.getInputFileFormatClass();
- InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
- Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob);
+ Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
+ TableDesc table = part.getTableDesc();
+ TableScanOperator tableScan = null;
+
+ List<String> aliases =
+ mrwork.getPathToAliases().get(dir.toUri().toString());
// Make filter pushdown information available to getSplits.
- ArrayList<String> aliases =
- mrwork.getPathToAliases().get(dir.toUri().toString());
if ((aliases != null) && (aliases.size() == 1)) {
Operator op = mrwork.getAliasToWork().get(aliases.get(0));
if ((op != null) && (op instanceof TableScanOperator)) {
- TableScanOperator tableScan = (TableScanOperator) op;
- pushFilters(newjob, tableScan);
+ tableScan = (TableScanOperator) op;
}
}
- FileInputFormat.setInputPaths(newjob, dir);
- newjob.setInputFormat(inputFormat.getClass());
- InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
- for (InputSplit is : iss) {
- result.add(new HiveInputSplit(is, inputFormatClass.getName()));
+ if (!currentDirs.isEmpty() &&
+ inputFormatClass.equals(currentInputFormatClass) &&
+ table.equals(currentTable) &&
+ tableScan == currentTableScan) {
+ currentDirs.add(dir);
+ continue;
+ }
+
+ if (!currentDirs.isEmpty()) {
+ LOG.info("Generating splits");
+ addSplitsForGroup(currentDirs, currentTableScan, newjob,
+ getInputFormatFromCache(currentInputFormatClass, job),
+ currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length),
+ currentTable, result);
}
+
+ currentDirs.clear();
+ currentDirs.add(dir);
+ currentTableScan = tableScan;
+ currentTable = table;
+ currentInputFormatClass = inputFormatClass;
}
+ LOG.info("Generating splits");
+ addSplitsForGroup(currentDirs, currentTableScan, newjob,
+ getInputFormatFromCache(currentInputFormatClass, job),
+ currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length),
+ currentTable, result);
+
LOG.info("number of splits " + result.size());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return result.toArray(new HiveInputSplit[result.size()]);