You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/10/28 21:55:07 UTC
svn commit: r1536517 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io:
BucketizedHiveInputFormat.java CombineHiveInputFormat.java
Author: hashutosh
Date: Mon Oct 28 20:55:07 2013
New Revision: 1536517
URL: http://svn.apache.org/r1536517
Log:
HIVE-5554 : add more comments to CombineHiveInputFormat.java, BucketizedHiveInputFormat.java (Thejas Nair via Ashutosh Chauhan)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java?rev=1536517&r1=1536516&r2=1536517&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java Mon Oct 28 20:55:07 2013
@@ -77,16 +77,23 @@ public class BucketizedHiveInputFormat<K
return rr;
}
- protected FileStatus[] listStatus(JobConf job, Path path) throws IOException {
+ /**
+ * Recursively lists status for all files starting from the directory dir
+ * @param job
+ * @param dir
+ * @return
+ * @throws IOException
+ */
+ protected FileStatus[] listStatus(JobConf job, Path dir) throws IOException {
ArrayList<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();
- FileSystem fs = path.getFileSystem(job);
- FileStatus[] matches = fs.globStatus(path);
+ FileSystem fs = dir.getFileSystem(job);
+ FileStatus[] matches = fs.globStatus(dir);
if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + path));
+ errors.add(new IOException("Input path does not exist: " + dir));
} else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + path + " matches 0 files"));
+ errors.add(new IOException("Input Pattern " + dir + " matches 0 files"));
} else {
for (FileStatus globStat : matches) {
FileUtils.listStatusRecursively(fs, globStat, result);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1536517&r1=1536516&r2=1536517&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Mon Oct 28 20:55:07 2013
@@ -317,6 +317,8 @@ public class CombineHiveInputFormat<K ex
// https://issues.apache.org/jira/browse/MAPREDUCE-1597 is fixed.
// Hadoop does not handle non-splittable files correctly for CombineFileInputFormat,
// so don't use CombineFileInputFormat for non-splittable files
+
+ //ie, dont't combine if inputformat is a TextInputFormat and has compression turned on
FileSystem inpFs = path.getFileSystem(job);
if (inputFormat instanceof TextInputFormat) {
@@ -327,6 +329,7 @@ public class CombineHiveInputFormat<K ex
if (fStats.isDir()) {
dirs.offer(path);
} else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
+ //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
splits = super.getSplits(job, numSplits);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return splits;
@@ -340,6 +343,7 @@ public class CombineHiveInputFormat<K ex
dirs.offer(fStatus[idx].getPath());
} else if ((new CompressionCodecFactory(job)).getCodec(
fStatus[idx].getPath()) != null) {
+ //if compresssion codec is set, use HiveInputFormat.getSplits (don't combine)
splits = super.getSplits(job, numSplits);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return splits;
@@ -348,7 +352,7 @@ public class CombineHiveInputFormat<K ex
}
}
}
-
+ //don't combine if inputformat is a SymlinkTextInputFormat
if (inputFormat instanceof SymlinkTextInputFormat) {
splits = super.getSplits(job, numSplits);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
@@ -362,6 +366,10 @@ public class CombineHiveInputFormat<K ex
List<Operator<? extends OperatorDesc>> opList = null;
if (!mrwork.isMapperCannotSpanPartns()) {
+ //if mapper can span partitions, make sure a splits does not contain multiple
+ // opList + inputFormatClassName + deserializerClassName combination
+ // This is done using the Map of CombinePathInputFormat to PathFilter
+
opList = HiveFileFormatUtils.doGetWorksFromPath(
pathToAliases, aliasToWork, filterPath);
CombinePathInputFormat combinePathInputFormat =
@@ -397,6 +405,9 @@ public class CombineHiveInputFormat<K ex
// Processing directories
List<InputSplitShim> iss = new ArrayList<InputSplitShim>();
if (!mrwork.isMapperCannotSpanPartns()) {
+ //mapper can span partitions
+ //combine into as few as one split, subject to the PathFilters set
+ // using combine.createPool.
iss = Arrays.asList(combine.getSplits(job, 1));
} else {
for (Path path : inpDirs) {