You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2013/03/18 20:11:03 UTC
svn commit: r1457922 - in /hadoop/common/branches/branch-1: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Author: tucu
Date: Mon Mar 18 19:11:02 2013
New Revision: 1457922
URL: http://svn.apache.org/r1457922
Log:
Reverting MAPREDUCE-5038 (commit 1454129)
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1457922&r1=1457921&r2=1457922&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Mar 18 19:11:02 2013
@@ -531,9 +531,6 @@ Release 1.2.0 - unreleased
HADOOP-9375. Port HADOOP-7290 to branch-1 to fix TestUserGroupInformation
failure. (Xiaobo Peng via suresh)
- MAPREDUCE-5038. old API CombineFileInputFormat missing fixes that are in
- new API. (sandyr via tucu)
-
MAPREDUCE-5049. CombineFileInputFormat counts all compressed files
non-splitable. (sandyr via tucu)
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1457922&r1=1457921&r2=1457922&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Mon Mar 18 19:11:02 2013
@@ -20,9 +20,7 @@ package org.apache.hadoop.mapred.lib;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.HashMap;
import java.util.Set;
@@ -35,8 +33,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
@@ -79,7 +75,7 @@ public abstract class CombineFileInputFo
private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>();
// mapping from a rack name to the set of Nodes in the rack
- private HashMap<String, Set<String>> rackToNodes =
+ private static HashMap<String, Set<String>> rackToNodes =
new HashMap<String, Set<String>>();
/**
* Specify the maximum size (in bytes) of each split. Each split is
@@ -132,16 +128,6 @@ public abstract class CombineFileInputFo
pools.add(multi);
}
- @Override
- protected boolean isSplitable(FileSystem fs, Path file) {
- final CompressionCodec codec =
- new CompressionCodecFactory(fs.getConf()).getCodec(file);
- if (null == codec) {
- return true;
- }
- return codec instanceof SplittableCompressionCodec;
- }
-
/**
* default constructor
*/
@@ -195,31 +181,24 @@ public abstract class CombineFileInputFo
if (paths.length == 0) {
return splits.toArray(new CombineFileSplit[splits.size()]);
}
-
- // Convert them to Paths first. This is a costly operation and
- // we should do it first, otherwise we will incur doing it multiple
- // times, one time each for each pool in the next loop.
- List<Path> newpaths = new LinkedList<Path>();
- for (int i = 0; i < paths.length; i++) {
- FileSystem fs = paths[i].getFileSystem(job);
- Path p = fs.makeQualified(paths[i]);
- newpaths.add(p);
- }
- paths = null;
// In one single iteration, process all the paths in a single pool.
- // Processing one pool at a time ensures that a split contains paths
+ // Processing one pool at a time ensures that a split contans paths
// from a single pool only.
for (MultiPathFilter onepool : pools) {
ArrayList<Path> myPaths = new ArrayList<Path>();
// pick one input path. If it matches all the filters in a pool,
// add it to the output set
- for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
- Path p = iter.next();
+ for (int i = 0; i < paths.length; i++) {
+ if (paths[i] == null) { // already processed
+ continue;
+ }
+ FileSystem fs = paths[i].getFileSystem(job);
+ Path p = new Path(paths[i].toUri().getPath());
if (onepool.accept(p)) {
- myPaths.add(p); // add it to my output set
- iter.remove();
+ myPaths.add(paths[i]); // add it to my output set
+ paths[i] = null; // already processed
}
}
// create splits for all files in this pool.
@@ -227,8 +206,16 @@ public abstract class CombineFileInputFo
maxSize, minSizeNode, minSizeRack, splits);
}
+ // Finally, process all paths that do not belong to any pool.
+ ArrayList<Path> myPaths = new ArrayList<Path>();
+ for (int i = 0; i < paths.length; i++) {
+ if (paths[i] == null) { // already processed
+ continue;
+ }
+ myPaths.add(paths[i]);
+ }
// create splits for all files that are not in any pool.
- getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
+ getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
@@ -267,14 +254,13 @@ public abstract class CombineFileInputFo
// populate all the blocks for all files
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
- files[i] = new OneFileInfo(paths[i], job,
- isSplitable(paths[i].getFileSystem(job), paths[i]),
- rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
+ files[i] = new OneFileInfo(paths[i], job,
+ rackToBlocks, blockToNodes, nodeToBlocks);
totLength += files[i].getLength();
}
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
- Set<String> nodes = new HashSet<String>();
+ ArrayList<String> nodes = new ArrayList<String>();
long curSplitSize = 0;
// process all nodes and create splits that are local
@@ -327,7 +313,7 @@ public abstract class CombineFileInputFo
// in 'overflow'. After the processing of all racks is complete, these overflow
// blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
- Set<String> racks = new HashSet<String>();
+ ArrayList<String> racks = new ArrayList<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
@@ -432,7 +418,7 @@ public abstract class CombineFileInputFo
*/
private void addCreatedSplit(JobConf job,
List<CombineFileSplit> splitList,
- Collection<String> locations,
+ List<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
@@ -465,11 +451,9 @@ public abstract class CombineFileInputFo
private OneBlockInfo[] blocks; // all blocks in this file
OneFileInfo(Path path, JobConf job,
- boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
- HashMap<String, List<OneBlockInfo>> nodeToBlocks,
- HashMap<String, Set<String>> rackToNodes)
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks)
throws IOException {
this.fileSize = 0;
@@ -482,28 +466,17 @@ public abstract class CombineFileInputFo
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
- if (!isSplitable) {
- // if the file is not splitable, just create the one block with
- // full file length
- blocks = new OneBlockInfo[1];
- fileSize = stat.getLen();
- blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
- .getHosts(), locations[0].getTopologyPaths());
- } else {
- blocks = new OneBlockInfo[locations.length];
- for (int i = 0; i < locations.length; i++) {
-
- fileSize += locations[i].getLength();
- OneBlockInfo oneblock = new OneBlockInfo(path,
- locations[i].getOffset(),
- locations[i].getLength(),
- locations[i].getHosts(),
- locations[i].getTopologyPaths());
- blocks[i] = oneblock;
- }
- }
+ blocks = new OneBlockInfo[locations.length];
+ for (int i = 0; i < locations.length; i++) {
+
+ fileSize += locations[i].getLength();
+ OneBlockInfo oneblock = new OneBlockInfo(path,
+ locations[i].getOffset(),
+ locations[i].getLength(),
+ locations[i].getHosts(),
+ locations[i].getTopologyPaths());
+ blocks[i] = oneblock;
- for (OneBlockInfo oneblock : blocks) {
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
@@ -517,8 +490,8 @@ public abstract class CombineFileInputFo
}
blklist.add(oneblock);
// Add this host to rackToNodes map
- addHostToRack(rackToNodes, oneblock.racks[j], oneblock.hosts[j]);
- }
+ addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
+ }
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -581,8 +554,7 @@ public abstract class CombineFileInputFo
}
}
- private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
- String rack, String host) {
+ private static void addHostToRack(String rack, String host) {
Set<String> hosts = rackToNodes.get(rack);
if (hosts == null) {
hosts = new HashSet<String>();
@@ -590,10 +562,9 @@ public abstract class CombineFileInputFo
}
hosts.add(host);
}
-
- private Set<String> getHosts(Set<String> racks) {
- Set<String> hosts = new HashSet<String>();
+ private static List<String> getHosts(List<String> racks) {
+ List<String> hosts = new ArrayList<String>();
for (String rack : racks) {
hosts.addAll(rackToNodes.get(rack));
}
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1457922&r1=1457921&r2=1457922&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Mon Mar 18 19:11:02 2013
@@ -18,17 +18,12 @@
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.zip.GZIPOutputStream;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -37,7 +32,6 @@ import org.apache.hadoop.hdfs.DFSTestUti
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
@@ -70,11 +64,9 @@ public class TestCombineFileInputFormat
final Path dir2 = new Path(inDir, "/dir2");
final Path dir3 = new Path(inDir, "/dir3");
final Path dir4 = new Path(inDir, "/dir4");
- final Path dir5 = new Path(inDir, "/dir5");
static final int BLOCKSIZE = 1024;
static final byte[] databuf = new byte[BLOCKSIZE];
- private static final String DUMMY_FS_URI = "dummyfs:///";
private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
@@ -86,24 +78,6 @@ public class TestCombineFileInputFormat
return null;
}
}
-
- /** Dummy class to extend CombineFileInputFormat. It allows
- * non-existent files to be passed into the CombineFileInputFormat, allows
- * for easy testing without having to create real files.
- */
- private class DummyInputFormat1 extends DummyInputFormat {
- @Override
- protected FileStatus[] listStatus(JobConf job) throws IOException {
- Path[] files = getInputPaths(job);
- FileStatus[] results = new FileStatus[files.length];
- for (int i = 0; i < files.length; i++) {
- Path p = files[i];
- FileSystem fs = p.getFileSystem(job);
- results[i] = fs.getFileStatus(p);
- }
- return results;
- }
- }
public void testSplitPlacement() throws IOException {
String namenode = null;
@@ -112,16 +86,16 @@ public class TestCombineFileInputFormat
FileSystem fileSys = null;
String testName = "TestSplitPlacement";
try {
- /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
- * 1) file1 and file5, just after starting the datanode on r1, with
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
+ * 1) file1, just after starting the datanode on r1, with
* a repl factor of 1, and,
* 2) file2, just after starting the datanode on r2, with
* a repl factor of 2, and,
- * 3) file3, file4 after starting the all three datanodes, with a repl
+ * 3) file3 after starting the all three datanodes, with a repl
* factor of 3.
- * At the end, file1, file5 will be present on only datanode1, file2 will
- * be present on datanode 1 and datanode2 and
- * file3, file4 will be present on all datanodes.
+ * At the end, file1 will be present on only datanode1, file2 will be
+ * present on datanode 1 and datanode2 and
+ * file3 will be present on all datanodes.
*/
JobConf conf = new JobConf();
conf.setBoolean("dfs.replication.considerLoad", false);
@@ -137,30 +111,6 @@ public class TestCombineFileInputFormat
}
Path file1 = new Path(dir1 + "/file1");
writeFile(conf, file1, (short)1, 1);
- // create another file on the same datanode
- Path file5 = new Path(dir5 + "/file5");
- writeFile(conf, file5, (short)1, 1);
- // split it using a CombinedFile input format
- DummyInputFormat inFormat = new DummyInputFormat();
- JobConf job = new JobConf(conf);
- FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
- InputSplit[] splits = inFormat.getSplits(job, 1);
- System.out.println("Made splits(Test0): " + splits.length);
- for (InputSplit split : splits) {
- System.out.println("File split(Test0): " + split);
- }
- assertEquals(splits.length, 1);
- CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file5.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
dfs.waitActive();
@@ -169,14 +119,14 @@ public class TestCombineFileInputFormat
writeFile(conf, file2, (short)2, 2);
// split it using a CombinedFile input format
- inFormat = new DummyInputFormat();
+ DummyInputFormat inFormat = new DummyInputFormat();
inFormat.setInputPaths(conf, dir1 + "," + dir2);
inFormat.setMinSplitSizeRack(BLOCKSIZE);
- splits = inFormat.getSplits(conf, 1);
+ InputSplit[] splits = inFormat.getSplits(conf, 1);
System.out.println("Made splits(Test1): " + splits.length);
// make sure that each split has different locations
- fileSplit = null;
+ CombineFileSplit fileSplit = null;
for (int i = 0; i < splits.length; ++i) {
fileSplit = (CombineFileSplit) splits[i];
System.out.println("File split(Test1): " + fileSplit);
@@ -486,7 +436,7 @@ public class TestCombineFileInputFormat
}
}
}
-
+
static void writeFile(Configuration conf, Path name,
short replication, int numBlocks) throws IOException {
FileSystem fileSys = FileSystem.get(conf);
@@ -494,409 +444,12 @@ public class TestCombineFileInputFormat
FSDataOutputStream stm = fileSys.create(name, true,
conf.getInt("io.file.buffer.size", 4096),
replication, (long)BLOCKSIZE);
- writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
- }
-
- // Creates the gzip file and return the FileStatus
- static FileStatus writeGzipFile(Configuration conf, Path name,
- short replication, int numBlocks) throws IOException {
- FileSystem fileSys = FileSystem.get(conf);
-
- GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
- .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
- writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
- return fileSys.getFileStatus(name);
- }
-
- private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
- OutputStream out, short replication, int numBlocks) throws IOException {
for (int i = 0; i < numBlocks; i++) {
- out.write(databuf);
+ stm.write(databuf);
}
- out.close();
+ stm.close();
DFSTestUtil.waitReplication(fileSys, name, replication);
}
-
- public void testSplitPlacementForCompressedFiles() throws IOException {
- MiniDFSCluster dfs = null;
- FileSystem fileSys = null;
- try {
- /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
- * files
- * 1) file1 and file5, just after starting the datanode on r1, with
- * a repl factor of 1, and,
- * 2) file2, just after starting the datanode on r2, with
- * a repl factor of 2, and,
- * 3) file3, file4 after starting the all three datanodes, with a repl
- * factor of 3.
- * At the end, file1, file5 will be present on only datanode1, file2 will
- * be present on datanode 1 and datanode2 and
- * file3, file4 will be present on all datanodes.
- */
- Configuration conf = new Configuration();
- conf.setBoolean("dfs.replication.considerLoad", false);
- dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
- dfs.waitActive();
-
- fileSys = dfs.getFileSystem();
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
- Path file1 = new Path(dir1 + "/file1.gz");
- FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
- // create another file on the same datanode
- Path file5 = new Path(dir5 + "/file5.gz");
- FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
- // split it using a CombinedFile input format
- DummyInputFormat inFormat = new DummyInputFormat();
- JobConf job = new JobConf(conf);
- FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
- InputSplit[] splits = inFormat.getSplits(job, 1);
- System.out.println("Made splits(Test0): " + splits.length);
- for (InputSplit split : splits) {
- System.out.println("File split(Test0): " + split);
- }
- assertEquals(splits.length, 1);
- CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(file5.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
- assertEquals(f5.getLen(), fileSplit.getLength(1));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-
- dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
- dfs.waitActive();
-
- // create file on two datanodes.
- Path file2 = new Path(dir2 + "/file2.gz");
- FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
-
- // split it using a CombinedFile input format
- inFormat = new DummyInputFormat();
- FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
- inFormat.setMinSplitSizeRack(f1.getLen());
- splits = inFormat.getSplits(job, 1);
- System.out.println("Made splits(Test1): " + splits.length);
-
- // make sure that each split has different locations
- for (InputSplit split : splits) {
- System.out.println("File split(Test1): " + split);
- }
- assertEquals(2, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits[1];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
- // create another file on 3 datanodes and 3 racks.
- dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
- dfs.waitActive();
- Path file3 = new Path(dir3 + "/file3.gz");
- FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
- inFormat = new DummyInputFormat();
- FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
- inFormat.setMinSplitSizeRack(f1.getLen());
- splits = inFormat.getSplits(job, 1);
- for (InputSplit split : splits) {
- System.out.println("File split(Test2): " + split);
- }
- assertEquals(3, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits[1];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits[2];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
- // create file4 on all three racks
- Path file4 = new Path(dir4 + "/file4.gz");
- FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
- inFormat = new DummyInputFormat();
- FileInputFormat.setInputPaths(job,
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
- inFormat.setMinSplitSizeRack(f1.getLen());
- splits = inFormat.getSplits(job, 1);
- for (InputSplit split : splits) {
- System.out.println("File split(Test3): " + split);
- }
- assertEquals(3, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
- assertEquals(f4.getLen(), fileSplit.getLength(1));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits[1];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits[2];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
- // maximum split size is file1's length
- inFormat = new DummyInputFormat();
- inFormat.setMinSplitSizeNode(f1.getLen());
- inFormat.setMaxSplitSize(f1.getLen());
- FileInputFormat.setInputPaths(job,
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
- splits = inFormat.getSplits(job, 1);
- for (InputSplit split : splits) {
- System.out.println("File split(Test4): " + split);
- }
- assertEquals(4, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits[1];
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f4.getLen(), fileSplit.getLength(0));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits[2];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits[3];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
- // maximum split size is twice file1's length
- inFormat = new DummyInputFormat();
- inFormat.setMinSplitSizeNode(f1.getLen());
- inFormat.setMaxSplitSize(2 * f1.getLen());
- FileInputFormat.setInputPaths(job,
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
- splits = inFormat.getSplits(job, 1);
- for (InputSplit split : splits) {
- System.out.println("File split(Test5): " + split);
- }
- assertEquals(3, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
- assertEquals(f4.getLen(), fileSplit.getLength(1));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits[1];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits[2];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
- // maximum split size is 4 times file1's length
- inFormat = new DummyInputFormat();
- inFormat.setMinSplitSizeNode(2 * f1.getLen());
- inFormat.setMaxSplitSize(4 * f1.getLen());
- FileInputFormat.setInputPaths(job,
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
- splits = inFormat.getSplits(job, 1);
- for (InputSplit split : splits) {
- System.out.println("File split(Test6): " + split);
- }
- assertEquals(2, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
- assertEquals(f4.getLen(), fileSplit.getLength(1));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits[1];
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(f2.getLen(), fileSplit.getLength(1));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-
- // maximum split size and min-split-size per rack is 4 times file1's length
- inFormat = new DummyInputFormat();
- inFormat.setMaxSplitSize(4 * f1.getLen());
- inFormat.setMinSplitSizeRack(4 * f1.getLen());
- FileInputFormat.setInputPaths(job,
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
- splits = inFormat.getSplits(job, 1);
- for (InputSplit split : splits) {
- System.out.println("File split(Test7): " + split);
- }
- assertEquals(1, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(4, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-
- // minimum split size per node is 4 times file1's length
- inFormat = new DummyInputFormat();
- inFormat.setMinSplitSizeNode(4 * f1.getLen());
- FileInputFormat.setInputPaths(job,
- dir1 + "," + dir2 + "," + dir3 + "," + dir4);
- splits = inFormat.getSplits(job, 1);
- for (InputSplit split : splits) {
- System.out.println("File split(Test8): " + split);
- }
- assertEquals(1, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(4, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-
- // Rack 1 has file1, file2 and file3 and file4
- // Rack 2 has file2 and file3 and file4
- // Rack 3 has file3 and file4
- // setup a filter so that only file1 and file2 can be combined
- inFormat = new DummyInputFormat();
- FileInputFormat.addInputPath(job, inDir);
- inFormat.setMinSplitSizeRack(1); // everything is at least rack local
- inFormat.createPool(job, new TestFilter(dir1),
- new TestFilter(dir2));
- splits = inFormat.getSplits(job, 1);
- for (InputSplit split : splits) {
- System.out.println("File split(Test9): " + split);
- }
- assertEquals(3, splits.length);
- fileSplit = (CombineFileSplit) splits[0];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits[1];
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
- fileSplit = (CombineFileSplit) splits[2];
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-
- // measure performance when there are multiple pools and
- // many files in each pool.
- int numPools = 100;
- int numFiles = 1000;
- DummyInputFormat1 inFormat1 = new DummyInputFormat1();
- for (int i = 0; i < numFiles; i++) {
- FileInputFormat.setInputPaths(job, file1);
- }
- inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
- final Path dirNoMatch1 = new Path(inDir, "/dirxx");
- final Path dirNoMatch2 = new Path(inDir, "/diryy");
- for (int i = 0; i < numPools; i++) {
- inFormat1.createPool(job, new TestFilter(dirNoMatch1),
- new TestFilter(dirNoMatch2));
- }
- long start = System.currentTimeMillis();
- splits = inFormat1.getSplits(job, 1);
- long end = System.currentTimeMillis();
- System.out.println("Elapsed time for " + numPools + " pools " +
- " and " + numFiles + " files is " +
- ((end - start)) + " milli seconds.");
- } finally {
- if (dfs != null) {
- dfs.shutdown();
- }
- }
- }
-
- /**
- * Test when input files are from non-default file systems
- */
- public void testForNonDefaultFileSystem() throws Throwable {
- Configuration conf = new Configuration();
-
- // use a fake file system scheme as default
- conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
-
- // default fs path
- assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
- // add a local file
- Path localPath = new Path("testFile1");
- FileSystem lfs = FileSystem.getLocal(conf);
- FSDataOutputStream dos = lfs.create(localPath);
- dos.writeChars("Local file for CFIF");
- dos.close();
-
- conf.set("mapred.working.dir", "/");
- JobConf job = new JobConf(conf);
-
- FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
- DummyInputFormat inFormat = new DummyInputFormat();
- InputSplit[] splits = inFormat.getSplits(job, 1);
- assertTrue(splits.length > 0);
- for (InputSplit s : splits) {
- CombineFileSplit cfs = (CombineFileSplit)s;
- for (Path p : cfs.getPaths()) {
- assertEquals(p.toUri().getScheme(), "file");
- }
- }
- }
static class TestFilter implements PathFilter {
private Path p;
@@ -909,7 +462,7 @@ public class TestCombineFileInputFormat
// returns true if the specified path matches the prefix stored
// in this TestFilter.
public boolean accept(Path path) {
- if (path.toUri().getPath().indexOf(p.toString()) == 0) {
+ if (path.toString().indexOf(p.toString()) == 0) {
return true;
}
return false;