You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/09/07 11:06:13 UTC
svn commit: r993293 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Author: amareshwari
Date: Tue Sep 7 09:06:12 2010
New Revision: 993293
URL: http://svn.apache.org/viewvc?rev=993293&view=rev
Log:
MAPREDUCE-1597. Fixes CombineFileInputFormat to work with non-splittable files. Contributed by Amareshwari Sriramadasu
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=993293&r1=993292&r2=993293&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Sep 7 09:06:12 2010
@@ -283,6 +283,9 @@ Trunk (unreleased changes)
MAPREDUCE-1975. Fixes unnecessary InterruptedException log in gridmix.
(Ravi Gummadi via amareshwari)
+ MAPREDUCE-1597. Fixes CombineFileInputFormat to work with non-splittable
+ files. (amareshwari)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=993293&r1=993292&r2=993293&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Tue Sep 7 09:06:12 2010
@@ -38,6 +38,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@@ -140,6 +143,16 @@ public abstract class CombineFileInputFo
}
pools.add(multi);
}
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ final CompressionCodec codec =
+ new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
+ if (null == codec) {
+ return true;
+ }
+ return codec instanceof SplittableCompressionCodec;
+ }
/**
* default constructor
@@ -223,12 +236,12 @@ public abstract class CombineFileInputFo
}
}
// create splits for all files in this pool.
- getMoreSplits(conf, myPaths.toArray(new Path[myPaths.size()]),
+ getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
}
// create splits for all files that are not in any pool.
- getMoreSplits(conf, newpaths.toArray(new Path[newpaths.size()]),
+ getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
@@ -239,10 +252,11 @@ public abstract class CombineFileInputFo
/**
* Return all the splits in the specified set of paths
*/
- private void getMoreSplits(Configuration conf, Path[] paths,
+ private void getMoreSplits(JobContext job, Path[] paths,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
+ Configuration conf = job.getConfiguration();
// all blocks for all the files in input set
OneFileInfo[] files;
@@ -267,9 +281,9 @@ public abstract class CombineFileInputFo
// populate all the blocks for all files
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
- files[i] = new OneFileInfo(paths[i], conf,
- rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes,
- maxSize);
+ files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
+ rackToBlocks, blockToNodes, nodeToBlocks,
+ rackToNodes, maxSize);
totLength += files[i].getLength();
}
@@ -465,6 +479,7 @@ public abstract class CombineFileInputFo
private OneBlockInfo[] blocks; // all blocks in this file
OneFileInfo(Path path, Configuration conf,
+ boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> nodeToBlocks,
@@ -482,69 +497,78 @@ public abstract class CombineFileInputFo
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
- ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(locations.length);
- for (int i = 0; i < locations.length; i++) {
-
- fileSize += locations[i].getLength();
-
- // each split can be a maximum of maxSize
- long left = locations[i].getLength();
- long myOffset = locations[i].getOffset();
- long myLength = 0;
- while (left > 0) {
- if (maxSize == 0) {
- myLength = left;
- } else {
- if (left > maxSize && left < 2 * maxSize) {
- // if remainder is between max and 2*max - then
- // instead of creating splits of size max, left-max we
- // create splits of size left/2 and left/2. This is
- // a heuristic to avoid creating really really small
- // splits.
- myLength = left / 2;
+ if (!isSplitable) {
+ // if the file is not splitable, just create the one block with
+ // full file length
+ blocks = new OneBlockInfo[1];
+ fileSize = stat.getLen();
+ blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+ .getHosts(), locations[0].getTopologyPaths());
+ } else {
+ ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+ locations.length);
+ for (int i = 0; i < locations.length; i++) {
+ fileSize += locations[i].getLength();
+
+ // each split can be a maximum of maxSize
+ long left = locations[i].getLength();
+ long myOffset = locations[i].getOffset();
+ long myLength = 0;
+ while (left > 0) {
+ if (maxSize == 0) {
+ myLength = left;
} else {
- myLength = Math.min(maxSize, left);
+ if (left > maxSize && left < 2 * maxSize) {
+ // if remainder is between max and 2*max - then
+ // instead of creating splits of size max, left-max we
+ // create splits of size left/2 and left/2. This is
+ // a heuristic to avoid creating really really small
+ // splits.
+ myLength = left / 2;
+ } else {
+ myLength = Math.min(maxSize, left);
+ }
}
+ OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+ myLength, locations[i].getHosts(), locations[i]
+ .getTopologyPaths());
+ left -= myLength;
+ myOffset += myLength;
+
+ blocksList.add(oneblock);
}
- OneBlockInfo oneblock = new OneBlockInfo(path,
- myOffset,
- myLength,
- locations[i].getHosts(),
- locations[i].getTopologyPaths());
- left -= myLength;
- myOffset += myLength;
-
- blocksList.add(oneblock);
-
- // add this block to the block --> node locations map
- blockToNodes.put(oneblock, oneblock.hosts);
-
- // add this block to the rack --> block map
- for (int j = 0; j < oneblock.racks.length; j++) {
- String rack = oneblock.racks[j];
- List<OneBlockInfo> blklist = rackToBlocks.get(rack);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- rackToBlocks.put(rack, blklist);
- }
- blklist.add(oneblock);
- // Add this host to rackToNodes map
- addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
- }
-
- // add this block to the node --> block map
- for (int j = 0; j < oneblock.hosts.length; j++) {
- String node = oneblock.hosts[j];
- List<OneBlockInfo> blklist = nodeToBlocks.get(node);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- nodeToBlocks.put(node, blklist);
- }
- blklist.add(oneblock);
+ }
+ blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+ }
+
+ for (OneBlockInfo oneblock : blocks) {
+ // add this block to the block --> node locations map
+ blockToNodes.put(oneblock, oneblock.hosts);
+
+ // add this block to the rack --> block map
+ for (int j = 0; j < oneblock.racks.length; j++) {
+ String rack = oneblock.racks[j];
+ List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ rackToBlocks.put(rack, blklist);
+ }
+ blklist.add(oneblock);
+ // Add this host to rackToNodes map
+ addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
+ }
+
+ // add this block to the node --> block map
+ for (int j = 0; j < oneblock.hosts.length; j++) {
+ String node = oneblock.hosts[j];
+ List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ nodeToBlocks.put(node, blklist);
}
+ blklist.add(oneblock);
}
}
- blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
}
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=993293&r1=993292&r2=993293&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Tue Sep 7 09:06:12 2010
@@ -18,8 +18,10 @@
package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.List;
import java.util.ArrayList;
+import java.util.zip.GZIPOutputStream;
import junit.framework.TestCase;
@@ -648,12 +650,376 @@ public class TestCombineFileInputFormat
FSDataOutputStream stm = fileSys.create(name, true,
conf.getInt("io.file.buffer.size", 4096),
replication, (long)BLOCKSIZE);
+ writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
+ }
+
+ // Creates the gzip file and return the FileStatus
+ static FileStatus writeGzipFile(Configuration conf, Path name,
+ short replication, int numBlocks) throws IOException {
+ FileSystem fileSys = FileSystem.get(conf);
+
+ GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
+ .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
+ writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
+ return fileSys.getFileStatus(name);
+ }
+
+ private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
+ OutputStream out, short replication, int numBlocks) throws IOException {
for (int i = 0; i < numBlocks; i++) {
- stm.write(databuf);
+ out.write(databuf);
}
- stm.close();
+ out.close();
DFSTestUtil.waitReplication(fileSys, name, replication);
}
+
+ public void testSplitPlacementForCompressedFiles() throws IOException {
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+ try {
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
+ * files
+ * 1) file1 and file5, just after starting the datanode on r1, with
+ * a repl factor of 1, and,
+ * 2) file2, just after starting the datanode on r2, with
+ * a repl factor of 2, and,
+ * 3) file3, file4 after starting the all three datanodes, with a repl
+ * factor of 3.
+ * At the end, file1, file5 will be present on only datanode1, file2 will
+ * be present on datanode 1 and datanode2 and
+ * file3, file4 will be present on all datanodes.
+ */
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+ dfs.waitActive();
+
+ fileSys = dfs.getFileSystem();
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ Path file1 = new Path(dir1 + "/file1.gz");
+ FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
+ // create another file on the same datanode
+ Path file5 = new Path(dir5 + "/file5.gz");
+ FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
+ // split it using a CombinedFile input format
+ DummyInputFormat inFormat = new DummyInputFormat();
+ Job job = Job.getInstance(conf);
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+ List<InputSplit> splits = inFormat.getSplits(job);
+ System.out.println("Made splits(Test0): " + splits.size());
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test0): " + split);
+ }
+ assertEquals(splits.size(), 1);
+ CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f5.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+ dfs.waitActive();
+
+ // create file on two datanodes.
+ Path file2 = new Path(dir2 + "/file2.gz");
+ FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
+
+ // split it using a CombinedFile input format
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job);
+ System.out.println("Made splits(Test1): " + splits.size());
+
+ // make sure that each split has different locations
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test1): " + split);
+ }
+ assertEquals(2, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // create another file on 3 datanodes and 3 racks.
+ dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+ dfs.waitActive();
+ Path file3 = new Path(dir3 + "/file3.gz");
+ FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test2): " + split);
+ }
+ assertEquals(3, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // create file4 on all three racks
+ Path file4 = new Path(dir4 + "/file4.gz");
+ FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test3): " + split);
+ }
+ assertEquals(3, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(f1.getLen());
+ inFormat.setMaxSplitSize(f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test4): " + split);
+ }
+ assertEquals(4, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f4.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(3);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is twice file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(f1.getLen());
+ inFormat.setMaxSplitSize(2 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test5): " + split);
+ }
+ assertEquals(3, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(2 * f1.getLen());
+ inFormat.setMaxSplitSize(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test6): " + split);
+ }
+ assertEquals(2, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
+ assertEquals(f2.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size and min-split-size per rack is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMaxSplitSize(4 * f1.getLen());
+ inFormat.setMinSplitSizeRack(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test7): " + split);
+ }
+ assertEquals(1, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ // minimum split size per node is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test8): " + split);
+ }
+ assertEquals(1, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ // Rack 1 has file1, file2 and file3 and file4
+ // Rack 2 has file2 and file3 and file4
+ // Rack 3 has file3 and file4
+ // setup a filter so that only file1 and file2 can be combined
+ inFormat = new DummyInputFormat();
+ FileInputFormat.addInputPath(job, inDir);
+ inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+ inFormat.createPool(new TestFilter(dir1),
+ new TestFilter(dir2));
+ splits = inFormat.getSplits(job);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test9): " + split);
+ }
+ assertEquals(3, splits.size());
+ fileSplit = (CombineFileSplit) splits.get(0);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits.get(1);
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+ fileSplit = (CombineFileSplit) splits.get(2);
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+ // measure performance when there are multiple pools and
+ // many files in each pool.
+ int numPools = 100;
+ int numFiles = 1000;
+ DummyInputFormat1 inFormat1 = new DummyInputFormat1();
+ for (int i = 0; i < numFiles; i++) {
+ FileInputFormat.setInputPaths(job, file1);
+ }
+ inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
+ final Path dirNoMatch1 = new Path(inDir, "/dirxx");
+ final Path dirNoMatch2 = new Path(inDir, "/diryy");
+ for (int i = 0; i < numPools; i++) {
+ inFormat1.createPool(new TestFilter(dirNoMatch1),
+ new TestFilter(dirNoMatch2));
+ }
+ long start = System.currentTimeMillis();
+ splits = inFormat1.getSplits(job);
+ long end = System.currentTimeMillis();
+ System.out.println("Elapsed time for " + numPools + " pools " +
+ " and " + numFiles + " files is " +
+ ((end - start)) + " milli seconds.");
+ } finally {
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
static class TestFilter implements PathFilter {
private Path p;