You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by en...@apache.org on 2007/09/11 12:50:34 UTC
svn commit: r574546 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java
Author: enis
Date: Tue Sep 11 03:50:34 2007
New Revision: 574546
URL: http://svn.apache.org/viewvc?rev=574546&view=rev
Log:
HADOOP-1818. Fix MultiFileInputFormat so that it does not return
empty splits. Contributed by Thomas Friol.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=574546&r1=574545&r2=574546&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Sep 11 03:50:34 2007
@@ -108,6 +108,9 @@
HADOOP-1853. Fix contrib/streaming to accept multiple -cacheFile
options. (Prachi Gupta via cutting)
+ HADOOP-1818. Fix MultiFileInputFormat so that it does not return
+ empty splits when numPaths < numSplits. (Thomas Friol via enis)
+
IMPROVEMENTS
HADOOP-1266. Remove dependency of package org.apache.hadoop.net on
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java?rev=574546&r1=574545&r2=574546&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java Tue Sep 11 03:50:34 2007
@@ -19,6 +19,8 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,35 +44,40 @@
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
- MultiFileSplit[] splits = new MultiFileSplit[numSplits];
Path[] paths = listPaths(job);
- long[] lengths = new long[paths.length];
- long totLength = 0;
- for(int i=0; i<paths.length; i++) {
- FileSystem fs = paths[i].getFileSystem(job);
- lengths[i] = fs.getContentLength(paths[i]);
- totLength += lengths[i];
- }
- float avgLengthPerSplit = ((float)totLength) / numSplits;
- long cumulativeLength = 0;
+ List<MultiFileSplit> splits = new ArrayList<MultiFileSplit>(Math.min(numSplits, paths.length));
+ if (paths.length != 0) {
+ // HADOOP-1818: Manage splits only if there are paths
+ long[] lengths = new long[paths.length];
+ long totLength = 0;
+ for(int i=0; i<paths.length; i++) {
+ FileSystem fs = paths[i].getFileSystem(job);
+ lengths[i] = fs.getContentLength(paths[i]);
+ totLength += lengths[i];
+ }
+ float avgLengthPerSplit = ((float)totLength) / numSplits;
+ long cumulativeLength = 0;
- int startIndex = 0;
+ int startIndex = 0;
- for(int i=0; i<numSplits; i++) {
- int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
- , startIndex, lengths);
- Path[] splitPaths = new Path[splitSize];
- long[] splitLengths = new long[splitSize];
- System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
- System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
- splits[i] = new MultiFileSplit(job, splitPaths, splitLengths);
- startIndex += splitSize;
- for(long l: splitLengths) {
- cumulativeLength += l;
+ for(int i=0; i<numSplits; i++) {
+ int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
+ , startIndex, lengths);
+ if (splitSize != 0) {
+ // HADOOP-1818: Manage split only if split size is not equals to 0
+ Path[] splitPaths = new Path[splitSize];
+ long[] splitLengths = new long[splitSize];
+ System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
+ System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
+ splits.add(new MultiFileSplit(job, splitPaths, splitLengths));
+ startIndex += splitSize;
+ for(long l: splitLengths) {
+ cumulativeLength += l;
+ }
+ }
}
}
- return splits;
-
+ return splits.toArray(new MultiFileSplit[splits.size()]);
}
private int findSize(int splitIndex, float avgLengthPerSplit
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java?rev=574546&r1=574545&r2=574546&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java Tue Sep 11 03:50:34 2007
@@ -33,6 +33,7 @@
public class TestMultiFileInputFormat extends TestCase{
private static JobConf job = new JobConf();
+
private static final Log LOG = LogFactory.getLog(TestMultiFileInputFormat.class);
private static final int MAX_SPLIT_COUNT = 10000;
@@ -53,7 +54,7 @@
}
}
- private Path initFiles(FileSystem fs, int numFiles) throws IOException{
+ private Path initFiles(FileSystem fs, int numFiles, int numBytes) throws IOException{
Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
Path multiFileDir = new Path(dir, "test.multifile");
fs.delete(multiFileDir);
@@ -62,7 +63,9 @@
for(int i=0; i<numFiles ;i++) {
Path path = new Path(multiFileDir, "file_" + i);
FSDataOutputStream out = fs.create(path);
- int numBytes = rand.nextInt(MAX_BYTES);
+ if (numBytes == -1) {
+ numBytes = rand.nextInt(MAX_BYTES);
+ }
for(int j=0; j< numBytes; j++) {
out.write(rand.nextInt());
}
@@ -92,7 +95,7 @@
for(int numFiles = 1; numFiles< MAX_NUM_FILES ;
numFiles+= (NUM_FILES_INCR / 2) + rand.nextInt(NUM_FILES_INCR / 2)) {
- Path dir = initFiles(fs, numFiles);
+ Path dir = initFiles(fs, numFiles, -1);
BitSet bits = new BitSet(numFiles);
for(int i=1;i< MAX_SPLIT_COUNT ;i+= rand.nextInt(SPLIT_COUNT_INCR) + 1) {
LOG.info("Running for Num Files=" + numFiles + ", split count=" + i);
@@ -119,6 +122,19 @@
fs.delete(dir);
}
LOG.info("Test Finished");
+ }
+
+ public void testFormatWithLessPathsThanSplits() throws Exception {
+ MultiFileInputFormat format = new DummyMultiFileInputFormat();
+ FileSystem fs = FileSystem.getLocal(job);
+
+ // Test with no path
+ initFiles(fs, 0, -1);
+ assertEquals(0, format.getSplits(job, 2).length);
+
+ // Test with 2 path and 4 splits
+ initFiles(fs, 2, 500);
+ assertEquals(2, format.getSplits(job, 4).length);
}
public static void main(String[] args) throws Exception{