You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tu...@apache.org on 2013/03/18 22:36:01 UTC
svn commit: r1458021 - in /hadoop/common/branches/branch-1: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Author: tucu
Date: Mon Mar 18 21:36:01 2013
New Revision: 1458021
URL: http://svn.apache.org/r1458021
Log:
MAPREDUCE-5038. old API CombineFileInputFormat missing fixes that are in new API. (sandyr via tucu)
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1458021&r1=1458020&r2=1458021&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Mon Mar 18 21:36:01 2013
@@ -22,6 +22,9 @@ Release 1.3.0 - unreleased
MAPREDUCE-5073. TestJobStatusPersistency.testPersistency fails on
JDK7. (sandyr via tucu)
+ MAPREDUCE-5038. old API CombineFileInputFormat missing fixes that are
+ in new API. (sandyr via tucu)
+
Release 1.2.0 - unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1458021&r1=1458020&r2=1458021&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Mon Mar 18 21:36:01 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.mapred.lib;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.HashMap;
import java.util.Set;
@@ -33,6 +35,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
@@ -75,7 +79,7 @@ public abstract class CombineFileInputFo
private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>();
// mapping from a rack name to the set of Nodes in the rack
- private static HashMap<String, Set<String>> rackToNodes =
+ private HashMap<String, Set<String>> rackToNodes =
new HashMap<String, Set<String>>();
/**
* Specify the maximum size (in bytes) of each split. Each split is
@@ -128,6 +132,16 @@ public abstract class CombineFileInputFo
pools.add(multi);
}
+ @Override
+ protected boolean isSplitable(FileSystem fs, Path file) {
+ final CompressionCodec codec =
+ new CompressionCodecFactory(fs.getConf()).getCodec(file);
+ if (null == codec) {
+ return true;
+ }
+ return codec instanceof SplittableCompressionCodec;
+ }
+
/**
* default constructor
*/
@@ -181,24 +195,31 @@ public abstract class CombineFileInputFo
if (paths.length == 0) {
return splits.toArray(new CombineFileSplit[splits.size()]);
}
+
+ // Convert them to Paths first. This is a costly operation and
+ // we should do it first, otherwise we will incur doing it multiple
+ // times, one time each for each pool in the next loop.
+ List<Path> newpaths = new LinkedList<Path>();
+ for (int i = 0; i < paths.length; i++) {
+ FileSystem fs = paths[i].getFileSystem(job);
+ Path p = fs.makeQualified(paths[i]);
+ newpaths.add(p);
+ }
+ paths = null;
// In one single iteration, process all the paths in a single pool.
- // Processing one pool at a time ensures that a split contans paths
+ // Processing one pool at a time ensures that a split contains paths
// from a single pool only.
for (MultiPathFilter onepool : pools) {
ArrayList<Path> myPaths = new ArrayList<Path>();
// pick one input path. If it matches all the filters in a pool,
// add it to the output set
- for (int i = 0; i < paths.length; i++) {
- if (paths[i] == null) { // already processed
- continue;
- }
- FileSystem fs = paths[i].getFileSystem(job);
- Path p = new Path(paths[i].toUri().getPath());
+ for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
+ Path p = iter.next();
if (onepool.accept(p)) {
- myPaths.add(paths[i]); // add it to my output set
- paths[i] = null; // already processed
+ myPaths.add(p); // add it to my output set
+ iter.remove();
}
}
// create splits for all files in this pool.
@@ -206,16 +227,8 @@ public abstract class CombineFileInputFo
maxSize, minSizeNode, minSizeRack, splits);
}
- // Finally, process all paths that do not belong to any pool.
- ArrayList<Path> myPaths = new ArrayList<Path>();
- for (int i = 0; i < paths.length; i++) {
- if (paths[i] == null) { // already processed
- continue;
- }
- myPaths.add(paths[i]);
- }
// create splits for all files that are not in any pool.
- getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
+ getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
@@ -254,13 +267,15 @@ public abstract class CombineFileInputFo
// populate all the blocks for all files
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
- files[i] = new OneFileInfo(paths[i], job,
- rackToBlocks, blockToNodes, nodeToBlocks);
+ files[i] = new OneFileInfo(paths[i], job,
+ isSplitable(paths[i].getFileSystem(job), paths[i]),
+ rackToBlocks, blockToNodes, nodeToBlocks,
+ rackToNodes, maxSize);
totLength += files[i].getLength();
}
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> nodes = new ArrayList<String>();
+ Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
// process all nodes and create splits that are local
@@ -313,7 +328,7 @@ public abstract class CombineFileInputFo
// in 'overflow'. After the processing of all racks is complete, these overflow
// blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> racks = new ArrayList<String>();
+ Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
@@ -418,7 +433,7 @@ public abstract class CombineFileInputFo
*/
private void addCreatedSplit(JobConf job,
List<CombineFileSplit> splitList,
- List<String> locations,
+ Collection<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
@@ -451,9 +466,12 @@ public abstract class CombineFileInputFo
private OneBlockInfo[] blocks; // all blocks in this file
OneFileInfo(Path path, JobConf job,
+ boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
- HashMap<String, List<OneBlockInfo>> nodeToBlocks)
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+ HashMap<String, Set<String>> rackToNodes,
+ long maxSize)
throws IOException {
this.fileSize = 0;
@@ -466,32 +484,77 @@ public abstract class CombineFileInputFo
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
- blocks = new OneBlockInfo[locations.length];
- for (int i = 0; i < locations.length; i++) {
-
- fileSize += locations[i].getLength();
- OneBlockInfo oneblock = new OneBlockInfo(path,
- locations[i].getOffset(),
- locations[i].getLength(),
- locations[i].getHosts(),
- locations[i].getTopologyPaths());
- blocks[i] = oneblock;
+ if (!isSplitable) {
+ // if the file is not splitable, just create the one block with
+ // full file length
+ blocks = new OneBlockInfo[1];
+ fileSize = stat.getLen();
+ blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
+ .getHosts(), locations[0].getTopologyPaths());
+ } else {
+ ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
+ locations.length);
+ for (int i = 0; i < locations.length; i++) {
+ fileSize += locations[i].getLength();
+
+ // each split can be a maximum of maxSize
+ long left = locations[i].getLength();
+ long myOffset = locations[i].getOffset();
+ long myLength = 0;
+ while (left > 0) {
+ if (maxSize == 0) {
+ myLength = left;
+ } else {
+ if (left > maxSize && left < 2 * maxSize) {
+ // if remainder is between max and 2*max - then
+ // instead of creating splits of size max, left-max we
+ // create splits of size left/2 and left/2. This is
+ // a heuristic to avoid creating really really small
+ // splits.
+ myLength = left / 2;
+ } else {
+ myLength = Math.min(maxSize, left);
+ }
+ }
+ OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
+ myLength, locations[i].getHosts(), locations[i]
+ .getTopologyPaths());
+ left -= myLength;
+ myOffset += myLength;
+ blocksList.add(oneblock);
+ }
+ }
+ blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
+ }
+
+ for (OneBlockInfo oneblock : blocks) {
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
+ // For blocks that do not have host/rack information,
+ // assign to default rack.
+ String[] racks = null;
+ if (oneblock.hosts.length == 0) {
+ racks = new String[]{NetworkTopology.DEFAULT_RACK};
+ } else {
+ racks = oneblock.racks;
+ }
+
// add this block to the rack --> block map
- for (int j = 0; j < oneblock.racks.length; j++) {
- String rack = oneblock.racks[j];
+ for (int j = 0; j < racks.length; j++) {
+ String rack = racks[j];
List<OneBlockInfo> blklist = rackToBlocks.get(rack);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
rackToBlocks.put(rack, blklist);
}
blklist.add(oneblock);
- // Add this host to rackToNodes map
- addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
- }
+ if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+ // Add this host to rackToNodes map
+ addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
+ }
+ }
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -554,7 +617,8 @@ public abstract class CombineFileInputFo
}
}
- private static void addHostToRack(String rack, String host) {
+ private static void addHostToRack(HashMap<String, Set<String>> rackToNodes,
+ String rack, String host) {
Set<String> hosts = rackToNodes.get(rack);
if (hosts == null) {
hosts = new HashSet<String>();
@@ -562,11 +626,14 @@ public abstract class CombineFileInputFo
}
hosts.add(host);
}
+
- private static List<String> getHosts(List<String> racks) {
- List<String> hosts = new ArrayList<String>();
+ private Set<String> getHosts(Set<String> racks) {
+ Set<String> hosts = new HashSet<String>();
for (String rack : racks) {
- hosts.addAll(rackToNodes.get(rack));
+ if (rackToNodes.containsKey(rack)) {
+ hosts.addAll(rackToNodes.get(rack));
+ }
}
return hosts;
}
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1458021&r1=1458020&r2=1458021&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Mon Mar 18 21:36:01 2013
@@ -18,12 +18,17 @@
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.zip.GZIPOutputStream;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -32,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSTestUti
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
@@ -64,9 +70,11 @@ public class TestCombineFileInputFormat
final Path dir2 = new Path(inDir, "/dir2");
final Path dir3 = new Path(inDir, "/dir3");
final Path dir4 = new Path(inDir, "/dir4");
+ final Path dir5 = new Path(inDir, "/dir5");
static final int BLOCKSIZE = 1024;
static final byte[] databuf = new byte[BLOCKSIZE];
+ private static final String DUMMY_FS_URI = "dummyfs:///";
private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
@@ -78,6 +86,24 @@ public class TestCombineFileInputFormat
return null;
}
}
+
+ /** Dummy class to extend CombineFileInputFormat. It allows
+ * non-existent files to be passed into the CombineFileInputFormat, allows
+ * for easy testing without having to create real files.
+ */
+ private class DummyInputFormat1 extends DummyInputFormat {
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] files = getInputPaths(job);
+ FileStatus[] results = new FileStatus[files.length];
+ for (int i = 0; i < files.length; i++) {
+ Path p = files[i];
+ FileSystem fs = p.getFileSystem(job);
+ results[i] = fs.getFileStatus(p);
+ }
+ return results;
+ }
+ }
public void testSplitPlacement() throws IOException {
String namenode = null;
@@ -86,16 +112,16 @@ public class TestCombineFileInputFormat
FileSystem fileSys = null;
String testName = "TestSplitPlacement";
try {
- /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
- * 1) file1, just after starting the datanode on r1, with
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create five files
+ * 1) file1 and file5, just after starting the datanode on r1, with
* a repl factor of 1, and,
* 2) file2, just after starting the datanode on r2, with
* a repl factor of 2, and,
- * 3) file3 after starting the all three datanodes, with a repl
+ * 3) file3, file4 after starting the all three datanodes, with a repl
* factor of 3.
- * At the end, file1 will be present on only datanode1, file2 will be
- * present on datanode 1 and datanode2 and
- * file3 will be present on all datanodes.
+ * At the end, file1, file5 will be present on only datanode1, file2 will
+ * be present on datanode 1 and datanode2 and
+ * file3, file4 will be present on all datanodes.
*/
JobConf conf = new JobConf();
conf.setBoolean("dfs.replication.considerLoad", false);
@@ -111,6 +137,30 @@ public class TestCombineFileInputFormat
}
Path file1 = new Path(dir1 + "/file1");
writeFile(conf, file1, (short)1, 1);
+ // create another file on the same datanode
+ Path file5 = new Path(dir5 + "/file5");
+ writeFile(conf, file5, (short)1, 1);
+ // split it using a CombinedFile input format
+ DummyInputFormat inFormat = new DummyInputFormat();
+ JobConf job = new JobConf(conf);
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+ InputSplit[] splits = inFormat.getSplits(job, 1);
+ System.out.println("Made splits(Test0): " + splits.length);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test0): " + split);
+ }
+ assertEquals(splits.length, 1);
+ CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
dfs.waitActive();
@@ -119,14 +169,14 @@ public class TestCombineFileInputFormat
writeFile(conf, file2, (short)2, 2);
// split it using a CombinedFile input format
- DummyInputFormat inFormat = new DummyInputFormat();
+ inFormat = new DummyInputFormat();
inFormat.setInputPaths(conf, dir1 + "," + dir2);
inFormat.setMinSplitSizeRack(BLOCKSIZE);
- InputSplit[] splits = inFormat.getSplits(conf, 1);
+ splits = inFormat.getSplits(conf, 1);
System.out.println("Made splits(Test1): " + splits.length);
// make sure that each split has different locations
- CombineFileSplit fileSplit = null;
+ fileSplit = null;
for (int i = 0; i < splits.length; ++i) {
fileSplit = (CombineFileSplit) splits[i];
System.out.println("File split(Test1): " + fileSplit);
@@ -436,7 +486,7 @@ public class TestCombineFileInputFormat
}
}
}
-
+
static void writeFile(Configuration conf, Path name,
short replication, int numBlocks) throws IOException {
FileSystem fileSys = FileSystem.get(conf);
@@ -444,12 +494,409 @@ public class TestCombineFileInputFormat
FSDataOutputStream stm = fileSys.create(name, true,
conf.getInt("io.file.buffer.size", 4096),
replication, (long)BLOCKSIZE);
+ writeDataAndSetReplication(fileSys, name, stm, replication, numBlocks);
+ }
+
+ // Creates the gzip file and return the FileStatus
+ static FileStatus writeGzipFile(Configuration conf, Path name,
+ short replication, int numBlocks) throws IOException {
+ FileSystem fileSys = FileSystem.get(conf);
+
+ GZIPOutputStream out = new GZIPOutputStream(fileSys.create(name, true, conf
+ .getInt("io.file.buffer.size", 4096), replication, (long) BLOCKSIZE));
+ writeDataAndSetReplication(fileSys, name, out, replication, numBlocks);
+ return fileSys.getFileStatus(name);
+ }
+
+ private static void writeDataAndSetReplication(FileSystem fileSys, Path name,
+ OutputStream out, short replication, int numBlocks) throws IOException {
for (int i = 0; i < numBlocks; i++) {
- stm.write(databuf);
+ out.write(databuf);
}
- stm.close();
+ out.close();
DFSTestUtil.waitReplication(fileSys, name, replication);
}
+
+ public void testSplitPlacementForCompressedFiles() throws IOException {
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+ try {
+ /* Start 3 datanodes, one each in rack r1, r2, r3. Create five gzipped
+ * files
+ * 1) file1 and file5, just after starting the datanode on r1, with
+ * a repl factor of 1, and,
+ * 2) file2, just after starting the datanode on r2, with
+ * a repl factor of 2, and,
+ * 3) file3, file4 after starting the all three datanodes, with a repl
+ * factor of 3.
+ * At the end, file1, file5 will be present on only datanode1, file2 will
+ * be present on datanode 1 and datanode2 and
+ * file3, file4 will be present on all datanodes.
+ */
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
+ dfs.waitActive();
+
+ fileSys = dfs.getFileSystem();
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ Path file1 = new Path(dir1 + "/file1.gz");
+ FileStatus f1 = writeGzipFile(conf, file1, (short)1, 1);
+ // create another file on the same datanode
+ Path file5 = new Path(dir5 + "/file5.gz");
+ FileStatus f5 = writeGzipFile(conf, file5, (short)1, 1);
+ // split it using a CombinedFile input format
+ DummyInputFormat inFormat = new DummyInputFormat();
+ JobConf job = new JobConf(conf);
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir5);
+ InputSplit[] splits = inFormat.getSplits(job, 1);
+ System.out.println("Made splits(Test0): " + splits.length);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test0): " + split);
+ }
+ assertEquals(splits.length, 1);
+ CombineFileSplit fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(file5.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f5.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
+ dfs.waitActive();
+
+ // create file on two datanodes.
+ Path file2 = new Path(dir2 + "/file2.gz");
+ FileStatus f2 = writeGzipFile(conf, file2, (short)2, 2);
+
+ // split it using a CombinedFile input format
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir2);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job, 1);
+ System.out.println("Made splits(Test1): " + splits.length);
+
+ // make sure that each split has different locations
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test1): " + split);
+ }
+ assertEquals(2, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // create another file on 3 datanodes and 3 racks.
+ dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
+ dfs.waitActive();
+ Path file3 = new Path(dir3 + "/file3.gz");
+ FileStatus f3 = writeGzipFile(conf, file3, (short)3, 3);
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test2): " + split);
+ }
+ assertEquals(3, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // create file4 on all three racks
+ Path file4 = new Path(dir4 + "/file4.gz");
+ FileStatus f4 = writeGzipFile(conf, file4, (short)3, 3);
+ inFormat = new DummyInputFormat();
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ inFormat.setMinSplitSizeRack(f1.getLen());
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test3): " + split);
+ }
+ assertEquals(3, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(f1.getLen());
+ inFormat.setMaxSplitSize(f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test4): " + split);
+ }
+ assertEquals(4, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f4.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[3];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is twice file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(f1.getLen());
+ inFormat.setMaxSplitSize(2 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test5): " + split);
+ }
+ assertEquals(3, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(2 * f1.getLen());
+ inFormat.setMaxSplitSize(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test6): " + split);
+ }
+ assertEquals(2, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(f4.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
+ assertEquals(f2.getLen(), fileSplit.getLength(1));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ // maximum split size and min-split-size per rack is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMaxSplitSize(4 * f1.getLen());
+ inFormat.setMinSplitSizeRack(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test7): " + split);
+ }
+ assertEquals(1, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ // minimum split size per node is 4 times file1's length
+ inFormat = new DummyInputFormat();
+ inFormat.setMinSplitSizeNode(4 * f1.getLen());
+ FileInputFormat.setInputPaths(job,
+ dir1 + "," + dir2 + "," + dir3 + "," + dir4);
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test8): " + split);
+ }
+ assertEquals(1, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+
+ // Rack 1 has file1, file2 and file3 and file4
+ // Rack 2 has file2 and file3 and file4
+ // Rack 3 has file3 and file4
+ // setup a filter so that only file1 and file2 can be combined
+ inFormat = new DummyInputFormat();
+ FileInputFormat.addInputPath(job, inDir);
+ inFormat.setMinSplitSizeRack(1); // everything is at least rack local
+ inFormat.createPool(job, new TestFilter(dir1),
+ new TestFilter(dir2));
+ splits = inFormat.getSplits(job, 1);
+ for (InputSplit split : splits) {
+ System.out.println("File split(Test9): " + split);
+ }
+ assertEquals(3, splits.length);
+ fileSplit = (CombineFileSplit) splits[0];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ fileSplit = (CombineFileSplit) splits[1];
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+ fileSplit = (CombineFileSplit) splits[2];
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+ // measure performance when there are multiple pools and
+ // many files in each pool.
+ int numPools = 100;
+ int numFiles = 1000;
+ DummyInputFormat1 inFormat1 = new DummyInputFormat1();
+ for (int i = 0; i < numFiles; i++) {
+ FileInputFormat.setInputPaths(job, file1);
+ }
+ inFormat1.setMinSplitSizeRack(1); // everything is at least rack local
+ final Path dirNoMatch1 = new Path(inDir, "/dirxx");
+ final Path dirNoMatch2 = new Path(inDir, "/diryy");
+ for (int i = 0; i < numPools; i++) {
+ inFormat1.createPool(job, new TestFilter(dirNoMatch1),
+ new TestFilter(dirNoMatch2));
+ }
+ long start = System.currentTimeMillis();
+ splits = inFormat1.getSplits(job, 1);
+ long end = System.currentTimeMillis();
+ System.out.println("Elapsed time for " + numPools + " pools " +
+ " and " + numFiles + " files is " +
+ ((end - start)) + " milli seconds.");
+ } finally {
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test when input files are from non-default file systems
+ */
+ public void testForNonDefaultFileSystem() throws Throwable {
+ Configuration conf = new Configuration();
+
+ // use a fake file system scheme as default
+ conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DUMMY_FS_URI);
+
+ // default fs path
+ assertEquals(DUMMY_FS_URI, FileSystem.getDefaultUri(conf).toString());
+ // add a local file
+ Path localPath = new Path("testFile1");
+ FileSystem lfs = FileSystem.getLocal(conf);
+ FSDataOutputStream dos = lfs.create(localPath);
+ dos.writeChars("Local file for CFIF");
+ dos.close();
+
+ conf.set("mapred.working.dir", "/");
+ JobConf job = new JobConf(conf);
+
+ FileInputFormat.setInputPaths(job, lfs.makeQualified(localPath));
+ DummyInputFormat inFormat = new DummyInputFormat();
+ InputSplit[] splits = inFormat.getSplits(job, 1);
+ assertTrue(splits.length > 0);
+ for (InputSplit s : splits) {
+ CombineFileSplit cfs = (CombineFileSplit)s;
+ for (Path p : cfs.getPaths()) {
+ assertEquals(p.toUri().getScheme(), "file");
+ }
+ }
+ }
static class TestFilter implements PathFilter {
private Path p;
@@ -462,7 +909,7 @@ public class TestCombineFileInputFormat
// returns true if the specified path matches the prefix stored
// in this TestFilter.
public boolean accept(Path path) {
- if (path.toString().indexOf(p.toString()) == 0) {
+ if (path.toUri().getPath().indexOf(p.toString()) == 0) {
return true;
}
return false;