You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zs...@apache.org on 2009/10/19 03:25:01 UTC
svn commit: r826573 - in /hadoop/common/branches/branch-0.20: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Author: zshao
Date: Mon Oct 19 01:25:01 2009
New Revision: 826573
URL: http://svn.apache.org/viewvc?rev=826573&view=rev
Log:
HADOOP-5759. Fix for IllegalArgumentException when CombineFileInputFormat
is used as job InputFormat. (Amareshwari Sriramadasu via zshao)
Modified:
hadoop/common/branches/branch-0.20/CHANGES.txt
hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
Modified: hadoop/common/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/CHANGES.txt?rev=826573&r1=826572&r2=826573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20/CHANGES.txt Mon Oct 19 01:25:01 2009
@@ -32,6 +32,9 @@
MAPREDUCE-1070. Prevent a deadlock in the fair scheduler servlet.
(Todd Lipcon via cdouglas)
+ HADOOP-5759. Fix for IllegalArgumentException when CombineFileInputFormat
+ is used as job InputFormat. (Amareshwari Sriramadasu via zshao)
+
Release 0.20.1 - 2009-09-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=826573&r1=826572&r2=826573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Mon Oct 19 01:25:01 2009
@@ -20,12 +20,12 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.HashMap;
import java.util.Set;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -73,6 +73,9 @@
// across multiple pools.
private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>();
+ // mapping from a rack name to the set of Nodes in the rack
+ private static HashMap<String, Set<String>> rackToNodes =
+ new HashMap<String, Set<String>>();
/**
* Specify the maximum size (in bytes) of each split. Each split is
* approximately equal to the specified size.
@@ -214,6 +217,8 @@
getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
maxSize, minSizeNode, minSizeRack, splits);
+ // free up rackToNodes map
+ rackToNodes.clear();
return splits.toArray(new CombineFileSplit[splits.size()]);
}
@@ -341,7 +346,7 @@
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
- addCreatedSplit(job, splits, racks, validBlocks);
+ addCreatedSplit(job, splits, getHosts(racks), validBlocks);
createdSplit = true;
break;
}
@@ -360,7 +365,7 @@
if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
// if there is a mimimum size specified, then create a single split
// otherwise, store these blocks into overflow data structure
- addCreatedSplit(job, splits, racks, validBlocks);
+ addCreatedSplit(job, splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that remained to be processed.
// Keep them in 'overflow' block list. These will be combined later.
@@ -393,7 +398,7 @@
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
- addCreatedSplit(job, splits, racks, validBlocks);
+ addCreatedSplit(job, splits, getHosts(racks), validBlocks);
curSplitSize = 0;
validBlocks.clear();
racks.clear();
@@ -402,7 +407,7 @@
// Process any remaining blocks, if any.
if (!validBlocks.isEmpty()) {
- addCreatedSplit(job, splits, racks, validBlocks);
+ addCreatedSplit(job, splits, getHosts(racks), validBlocks);
}
}
@@ -412,13 +417,12 @@
*/
private void addCreatedSplit(JobConf job,
List<CombineFileSplit> splitList,
- List<String> racks,
+ List<String> locations,
ArrayList<OneBlockInfo> validBlocks) {
// create an input split
Path[] fl = new Path[validBlocks.size()];
long[] offset = new long[validBlocks.size()];
long[] length = new long[validBlocks.size()];
- String[] rackLocations = racks.toArray(new String[racks.size()]);
for (int i = 0; i < validBlocks.size(); i++) {
fl[i] = validBlocks.get(i).onepath;
offset[i] = validBlocks.get(i).offset;
@@ -427,7 +431,7 @@
// add this split to the list that is returned
CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset,
- length, rackLocations);
+ length, locations.toArray(new String[0]));
splitList.add(thissplit);
}
@@ -484,7 +488,9 @@
rackToBlocks.put(rack, blklist);
}
blklist.add(oneblock);
- }
+ // Add this host to rackToNodes map
+ addHostToRack(oneblock.racks[j], oneblock.hosts[j]);
+ }
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
@@ -547,6 +553,23 @@
}
}
+ private static void addHostToRack(String rack, String host) {
+ Set<String> hosts = rackToNodes.get(rack);
+ if (hosts == null) {
+ hosts = new HashSet<String>();
+ rackToNodes.put(rack, hosts);
+ }
+ hosts.add(host);
+ }
+
+ private static List<String> getHosts(List<String> racks) {
+ List<String> hosts = new ArrayList<String>();
+ for (String rack : racks) {
+ hosts.addAll(rackToNodes.get(rack));
+ }
+ return hosts;
+ }
+
/**
* Accept a path only if any one of filters given in the
* constructor do.
Modified: hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=826573&r1=826572&r2=826573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Mon Oct 19 01:25:01 2009
@@ -18,11 +18,6 @@
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
-import java.io.DataOutputStream;
-import java.util.BitSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Random;
import junit.framework.TestCase;
@@ -30,17 +25,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -151,14 +141,14 @@
assertEquals(fileSplit.getPath(1).getName(), file2.getName());
assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r2");
+ assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
fileSplit = (CombineFileSplit) splits[1];
assertEquals(fileSplit.getNumPaths(), 1);
assertEquals(fileSplit.getLocations().length, 1);
assertEquals(fileSplit.getPath(0).getName(), file1.getName());
assertEquals(fileSplit.getOffset(0), 0);
assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r1");
+ assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
// create another file on 3 datanodes and 3 racks.
dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
@@ -186,7 +176,7 @@
assertEquals(fileSplit.getPath(2).getName(), file3.getName());
assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
assertEquals(fileSplit.getLength(2), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r3");
+ assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
fileSplit = (CombineFileSplit) splits[1];
assertEquals(fileSplit.getNumPaths(), 2);
assertEquals(fileSplit.getLocations().length, 1);
@@ -196,14 +186,14 @@
assertEquals(fileSplit.getPath(1).getName(), file2.getName());
assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r2");
+ assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
fileSplit = (CombineFileSplit) splits[2];
assertEquals(fileSplit.getNumPaths(), 1);
assertEquals(fileSplit.getLocations().length, 1);
assertEquals(fileSplit.getPath(0).getName(), file1.getName());
assertEquals(fileSplit.getOffset(0), 0);
assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r1");
+ assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
// create file4 on all three racks
Path file4 = new Path(dir4 + "/file4");
@@ -229,7 +219,7 @@
assertEquals(fileSplit.getPath(2).getName(), file3.getName());
assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
assertEquals(fileSplit.getLength(2), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r3");
+ assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
fileSplit = (CombineFileSplit) splits[1];
assertEquals(fileSplit.getNumPaths(), 2);
assertEquals(fileSplit.getLocations().length, 1);
@@ -239,14 +229,14 @@
assertEquals(fileSplit.getPath(1).getName(), file2.getName());
assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
assertEquals(fileSplit.getLength(1), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r2");
+ assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
fileSplit = (CombineFileSplit) splits[2];
assertEquals(fileSplit.getNumPaths(), 1);
assertEquals(fileSplit.getLocations().length, 1);
assertEquals(fileSplit.getPath(0).getName(), file1.getName());
assertEquals(fileSplit.getOffset(0), 0);
assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r1");
+ assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
// maximum split size is 2 blocks
inFormat = new DummyInputFormat();
@@ -385,7 +375,7 @@
assertEquals(fileSplit.getPath(0).getName(), file1.getName());
assertEquals(fileSplit.getOffset(0), 0);
assertEquals(fileSplit.getLength(0), BLOCKSIZE);
- assertEquals(fileSplit.getLocations()[0], "/r1");
+ assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
// maximum split size is 7 blocks and min is 3 blocks
inFormat = new DummyInputFormat();
@@ -431,15 +421,15 @@
fileSplit = (CombineFileSplit) splits[0];
assertEquals(fileSplit.getNumPaths(), 2);
assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getLocations()[0], "/r2");
+ assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2
fileSplit = (CombineFileSplit) splits[1];
assertEquals(fileSplit.getNumPaths(), 1);
assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getLocations()[0], "/r1");
+ assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1
fileSplit = (CombineFileSplit) splits[2];
assertEquals(fileSplit.getNumPaths(), 6);
assertEquals(fileSplit.getLocations().length, 1);
- assertEquals(fileSplit.getLocations()[0], "/r3");
+ assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3
} finally {
if (dfs != null) {
dfs.shutdown();