You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/02/27 19:53:04 UTC
svn commit: r1450915 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/...
Author: vinodkv
Date: Wed Feb 27 18:53:03 2013
New Revision: 1450915
URL: http://svn.apache.org/r1450915
Log:
MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits' allocation on small clusters. Contributed by Bikas Saha.
svn merge --ignore-ancestry -c 1450912 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1450915&r1=1450914&r2=1450915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Wed Feb 27 18:53:03 2013
@@ -11,6 +11,9 @@ Release 2.0.4-beta - UNRELEASED
MAPREDUCE-5033. mapred shell script should respect usage flags
(--help -help -h). (Andrew Wang via atm)
+ MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits'
+ allocation on small clusters. (Bikas Saha via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1450915&r1=1450914&r2=1450915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Wed Feb 27 18:53:03 2013
@@ -49,6 +49,8 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
* {@link InputFormat#getSplits(JobContext)} method.
@@ -76,7 +78,7 @@ import org.apache.hadoop.net.NetworkTopo
@InterfaceStability.Stable
public abstract class CombineFileInputFormat<K, V>
extends FileInputFormat<K, V> {
-
+
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
public static final String SPLIT_MINSIZE_PERRACK =
@@ -163,7 +165,6 @@ public abstract class CombineFileInputFo
@Override
public List<InputSplit> getSplits(JobContext job)
throws IOException {
-
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
@@ -286,56 +287,100 @@ public abstract class CombineFileInputFo
rackToNodes, maxSize);
totLength += files[i].getLength();
}
+ createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
+ maxSize, minSizeNode, minSizeRack, splits);
+ }
+ @VisibleForTesting
+ void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+ HashMap<OneBlockInfo, String[]> blockToNodes,
+ HashMap<String, List<OneBlockInfo>> rackToBlocks,
+ long totLength,
+ long maxSize,
+ long minSizeNode,
+ long minSizeRack,
+ List<InputSplit> splits
+ ) {
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
+
+ int numNodes = nodeToBlocks.size();
+ long totalLength = totLength;
- // process all nodes and create splits that are local
- // to a node.
- for (Iterator<Map.Entry<String,
- List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
- iter.hasNext();) {
-
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- nodes.add(one.getKey());
- List<OneBlockInfo> blocksInNode = one.getValue();
-
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- for (OneBlockInfo oneblock : blocksInNode) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(splits, nodes, validBlocks);
- curSplitSize = 0;
- validBlocks.clear();
+ while(true) {
+ // it is allowed for maxSize to be 0. Disable smoothing load for such cases
+ int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
+ ((int) (totalLength/maxSize))/numNodes
+ : Integer.MAX_VALUE;
+ int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
+ numNodes = 0;
+
+ // process all nodes and create splits that are local to a node.
+ for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+ .entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+ nodes.add(one.getKey());
+ List<OneBlockInfo> blocksInNode = one.getValue();
+
+ // for each block, copy it into validBlocks. Delete it from
+ // blockToNodes so that the same block does not appear in
+ // two different splits.
+ int splitsInNode = 0;
+ for (OneBlockInfo oneblock : blocksInNode) {
+ if (blockToNodes.containsKey(oneblock)) {
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(splits, nodes, validBlocks);
+ totalLength -= curSplitSize;
+ curSplitSize = 0;
+ validBlocks.clear();
+ splitsInNode++;
+ if (splitsInNode == maxSplitsByNodeOnly) {
+ // stop grouping on a node so as not to create
+ // disproportionately more splits on a node because it happens
+ // to have many blocks
+ // consider only these nodes in next round of grouping because
+ // they have leftover blocks that may need to be grouped
+ numNodes++;
+ break;
+ }
+ }
}
}
- }
- // if there were any blocks left over and their combined size is
- // larger than minSplitNode, then combine them into one split.
- // Otherwise add them back to the unprocessed pool. It is likely
- // that they will be combined with other blocks from the
- // same rack later on.
- if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
- // create an input split and add it to the splits array
- addCreatedSplit(splits, nodes, validBlocks);
- } else {
- for (OneBlockInfo oneblock : validBlocks) {
- blockToNodes.put(oneblock, oneblock.hosts);
+ // if there were any blocks left over and their combined size is
+ // larger than minSplitNode, then combine them into one split.
+ // Otherwise add them back to the unprocessed pool. It is likely
+ // that they will be combined with other blocks from the
+ // same rack later on.
+ if (minSizeNode != 0 && curSplitSize >= minSizeNode
+ && splitsInNode == 0) {
+ // haven't created any split on this machine. so its ok to add a
+ // smaller
+ // one for parallelism. Otherwise group it in the rack for balanced
+ // size
+ // create an input split and add it to the splits array
+ addCreatedSplit(splits, nodes, validBlocks);
+ totalLength -= curSplitSize;
+ } else {
+ for (OneBlockInfo oneblock : validBlocks) {
+ blockToNodes.put(oneblock, oneblock.hosts);
+ }
}
+ validBlocks.clear();
+ nodes.clear();
+ curSplitSize = 0;
+ }
+
+ if(!(numNodes>0 && totalLength>0)) {
+ break;
}
- validBlocks.clear();
- nodes.clear();
- curSplitSize = 0;
}
// if blocks in a rack are below the specified minimum size, then keep them
@@ -458,7 +503,6 @@ public abstract class CombineFileInputFo
offset[i] = validBlocks.get(i).offset;
length[i] = validBlocks.get(i).length;
}
-
// add this split to the list that is returned
CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
length, locations.toArray(new String[0]));
@@ -474,7 +518,8 @@ public abstract class CombineFileInputFo
/**
* information about one file from the File System
*/
- private static class OneFileInfo {
+ @VisibleForTesting
+ static class OneFileInfo {
private long fileSize; // size of the file
private OneBlockInfo[] blocks; // all blocks in this file
@@ -545,45 +590,55 @@ public abstract class CombineFileInputFo
}
blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
}
+
+ populateBlockInfo(blocks, rackToBlocks, blockToNodes,
+ nodeToBlocks, rackToNodes);
+ }
+ }
+
+ @VisibleForTesting
+ static void populateBlockInfo(OneBlockInfo[] blocks,
+ HashMap<String, List<OneBlockInfo>> rackToBlocks,
+ HashMap<OneBlockInfo, String[]> blockToNodes,
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+ HashMap<String, Set<String>> rackToNodes) {
+ for (OneBlockInfo oneblock : blocks) {
+ // add this block to the block --> node locations map
+ blockToNodes.put(oneblock, oneblock.hosts);
+
+ // For blocks that do not have host/rack information,
+ // assign to default rack.
+ String[] racks = null;
+ if (oneblock.hosts.length == 0) {
+ racks = new String[]{NetworkTopology.DEFAULT_RACK};
+ } else {
+ racks = oneblock.racks;
+ }
- for (OneBlockInfo oneblock : blocks) {
- // add this block to the block --> node locations map
- blockToNodes.put(oneblock, oneblock.hosts);
-
- // For blocks that do not have host/rack information,
- // assign to default rack.
- String[] racks = null;
- if (oneblock.hosts.length == 0) {
- racks = new String[]{NetworkTopology.DEFAULT_RACK};
- } else {
- racks = oneblock.racks;
+ // add this block to the rack --> block map
+ for (int j = 0; j < racks.length; j++) {
+ String rack = racks[j];
+ List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ rackToBlocks.put(rack, blklist);
}
-
- // add this block to the rack --> block map
- for (int j = 0; j < racks.length; j++) {
- String rack = racks[j];
- List<OneBlockInfo> blklist = rackToBlocks.get(rack);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- rackToBlocks.put(rack, blklist);
- }
- blklist.add(oneblock);
- if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
- // Add this host to rackToNodes map
- addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
- }
+ blklist.add(oneblock);
+ if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+ // Add this host to rackToNodes map
+ addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
}
+ }
- // add this block to the node --> block map
- for (int j = 0; j < oneblock.hosts.length; j++) {
- String node = oneblock.hosts[j];
- List<OneBlockInfo> blklist = nodeToBlocks.get(node);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- nodeToBlocks.put(node, blklist);
- }
- blklist.add(oneblock);
+ // add this block to the node --> block map
+ for (int j = 0; j < oneblock.hosts.length; j++) {
+ String node = oneblock.hosts[j];
+ List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ nodeToBlocks.put(node, blklist);
}
+ blklist.add(oneblock);
}
}
}
@@ -600,7 +655,8 @@ public abstract class CombineFileInputFo
/**
* information about one block from the File System
*/
- private static class OneBlockInfo {
+ @VisibleForTesting
+ static class OneBlockInfo {
Path onepath; // name of this file
long offset; // offset in file
long length; // length of this block
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1450915&r1=1450914&r2=1450915&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Wed Feb 27 18:53:03 2013
@@ -20,11 +20,14 @@ package org.apache.hadoop.mapreduce.lib.
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
+import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
+import java.util.Set;
import java.util.zip.GZIPOutputStream;
import java.util.concurrent.TimeoutException;
+import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.hadoop.fs.*;
@@ -42,9 +45,13 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.junit.Test;
+import com.google.common.collect.HashMultiset;
+
public class TestCombineFileInputFormat extends TestCase {
private static final String rack1[] = new String[] {
@@ -476,23 +483,23 @@ public class TestCombineFileInputFormat
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(0));
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
fileSplit = (CombineFileSplit) splits.get(2);
assertEquals(2, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(0));
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(file3.getName(), fileSplit.getPath(1).getName());
assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
// maximum split size is 3 blocks
inFormat = new DummyInputFormat();
@@ -504,7 +511,7 @@ public class TestCombineFileInputFormat
for (InputSplit split : splits) {
System.out.println("File split(Test5): " + split);
}
- assertEquals(4, splits.size());
+ assertEquals(3, splits.size());
fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(3, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
@@ -519,32 +526,28 @@ public class TestCombineFileInputFormat
assertEquals(BLOCKSIZE, fileSplit.getLength(2));
assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals(file4.getName(), fileSplit.getPath(2).getName());
- assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(0, fileSplit.getOffset(2));
assertEquals(BLOCKSIZE, fileSplit.getLength(2));
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+ assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(3, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(file4.getName(), fileSplit.getPath(1).getName());
assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(3);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file4.getName(), fileSplit.getPath(2).getName());
+ assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
// maximum split size is 4 blocks
@@ -713,6 +716,56 @@ public class TestCombineFileInputFormat
DFSTestUtil.waitReplication(fileSys, name, replication);
}
+ public void testNodeInputSplit() throws IOException, InterruptedException {
+ // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on
+ // both nodes. The grouping ensures that both nodes get splits instead of
+ // just the first node
+ DummyInputFormat inFormat = new DummyInputFormat();
+ int numBlocks = 12;
+ long totLength = 0;
+ long blockSize = 100;
+ long maxSize = 200;
+ long minSizeNode = 50;
+ long minSizeRack = 50;
+ String[] locations = { "h1", "h2" };
+ String[] racks = new String[0];
+ Path path = new Path("hdfs://file");
+
+ OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
+ for(int i=0; i<numBlocks; ++i) {
+ blocks[i] = new OneBlockInfo(path, i*blockSize, blockSize, locations, racks);
+ totLength += blockSize;
+ }
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ HashMap<String, Set<String>> rackToNodes =
+ new HashMap<String, Set<String>>();
+ HashMap<String, List<OneBlockInfo>> rackToBlocks =
+ new HashMap<String, List<OneBlockInfo>>();
+ HashMap<OneBlockInfo, String[]> blockToNodes =
+ new HashMap<OneBlockInfo, String[]>();
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks =
+ new HashMap<String, List<OneBlockInfo>>();
+
+ OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes,
+ nodeToBlocks, rackToNodes);
+
+ inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
+ maxSize, minSizeNode, minSizeRack, splits);
+
+ int expectedSplitCount = (int)(totLength/maxSize);
+ Assert.assertEquals(expectedSplitCount, splits.size());
+ HashMultiset<String> nodeSplits = HashMultiset.create();
+ for(int i=0; i<expectedSplitCount; ++i) {
+ InputSplit inSplit = splits.get(i);
+ Assert.assertEquals(maxSize, inSplit.getLength());
+ Assert.assertEquals(1, inSplit.getLocations().length);
+ nodeSplits.add(inSplit.getLocations()[0]);
+ }
+ Assert.assertEquals(3, nodeSplits.count(locations[0]));
+ Assert.assertEquals(3, nodeSplits.count(locations[1]));
+ }
+
public void testSplitPlacementForCompressedFiles() throws Exception {
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
@@ -889,24 +942,24 @@ public class TestCombineFileInputFormat
assertEquals(f3.getLen(), fileSplit.getLength(0));
assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f4.getLen(), fileSplit.getLength(0));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r3
fileSplit = (CombineFileSplit) splits.get(2);
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r2
fileSplit = (CombineFileSplit) splits.get(3);
assertEquals(1, fileSplit.getNumPaths());
assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(file4.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+ assertEquals(f4.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r1
// maximum split size is twice file1's length
inFormat = new DummyInputFormat();