You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by oz...@apache.org on 2015/05/05 03:23:59 UTC
hadoop git commit: MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat
failed on JDK8. Contributed by Akira AJISAKA.
Repository: hadoop
Updated Branches:
refs/heads/trunk d701acc9c -> 551615fa1
MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat failed on JDK8. Contributed by Akira AJISAKA.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/551615fa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/551615fa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/551615fa
Branch: refs/heads/trunk
Commit: 551615fa13f65ae996bae9c1bacff189539b6557
Parents: d701acc
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Tue May 5 10:23:13 2015 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Tue May 5 10:23:13 2015 +0900
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../lib/input/CombineFileInputFormat.java | 26 +-
.../lib/input/TestCombineFileInputFormat.java | 1138 ++++++++++++------
3 files changed, 805 insertions(+), 362 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/551615fa/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 481757a..002fbe6 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -368,6 +368,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
(Gera Shegalov via jlowe)
+ MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat failed on JDK8.
+ (Akira AJISAKA via ozawa)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/551615fa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
index 040c54b..b2b7656 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.Set;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -289,6 +288,26 @@ public abstract class CombineFileInputFormat<K, V>
maxSize, minSizeNode, minSizeRack, splits);
}
+ /**
+ * Process all the nodes and create splits that are local to a node.
+ * Generate one split per node iteration, and walk over nodes multiple times
+ * to distribute the splits across nodes.
+ * <p>
+ * Note: The order of processing the nodes is undetermined because the
+ * implementation of nodeToBlocks is {@link java.util.HashMap} and its order
+ * of the entries is undetermined.
+ * @param nodeToBlocks Mapping from a node to the list of blocks that
+ * it contains.
+ * @param blockToNodes Mapping from a block to the nodes on which
+ * it has replicas.
+ * @param rackToBlocks Mapping from a rack name to the list of blocks it has.
+ * @param totLength Total length of the input files.
+ * @param maxSize Max size of each split.
+ * If set to 0, disable smoothing load.
+ * @param minSizeNode Minimum split size per node.
+ * @param minSizeRack Minimum split size per rack.
+ * @param splits New splits created by this method are added to the list.
+ */
@VisibleForTesting
void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
Map<OneBlockInfo, String[]> blockToNodes,
@@ -309,11 +328,6 @@ public abstract class CombineFileInputFormat<K, V>
Set<String> completedNodes = new HashSet<String>();
while(true) {
- // it is allowed for maxSize to be 0. Disable smoothing load for such cases
-
- // process all nodes and create splits that are local to a node. Generate
- // one split per node iteration, and walk over nodes multiple times to
- // distribute the splits across nodes.
for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/551615fa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
index 85c675c..b49f2d8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -53,13 +54,22 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import com.google.common.collect.HashMultiset;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
public class TestCombineFileInputFormat {
@@ -92,6 +102,14 @@ public class TestCombineFileInputFormat {
static final int BLOCKSIZE = 1024;
static final byte[] databuf = new byte[BLOCKSIZE];
+ @Mock
+ private List<String> mockList;
+
+ @Before
+ public void initMocks() {
+ MockitoAnnotations.initMocks(this);
+ }
+
private static final String DUMMY_FS_URI = "dummyfs:///";
/** Dummy class to extend CombineFileInputFormat*/
@@ -299,7 +317,51 @@ public class TestCombineFileInputFormat {
assertFalse(rr.nextKeyValue());
}
+ /**
+ * For testing each split has the expected name, length, and offset.
+ */
+ private final class Split {
+ private String name;
+ private long length;
+ private long offset;
+
+ public Split(String name, long length, long offset) {
+ this.name = name;
+ this.length = length;
+ this.offset = offset;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Split) {
+ Split split = ((Split) obj);
+ return split.name.equals(name) && split.length == length
+ && split.offset == offset;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * The test suppresses unchecked warnings in
+ * {@link org.mockito.Mockito#reset}. Although calling the method is
+ * a bad manner, we call the method instead of splitting the test
+ * (i.e. restarting MiniDFSCluster) to save time.
+ */
@Test
+ @SuppressWarnings("unchecked")
public void testSplitPlacement() throws Exception {
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
@@ -326,10 +388,10 @@ public class TestCombineFileInputFormat {
throw new IOException("Mkdirs failed to create " + inDir.toString());
}
Path file1 = new Path(dir1 + "/file1");
- writeFile(conf, file1, (short)1, 1);
+ writeFile(conf, file1, (short) 1, 1);
// create another file on the same datanode
Path file5 = new Path(dir5 + "/file5");
- writeFile(conf, file5, (short)1, 1);
+ writeFile(conf, file5, (short) 1, 1);
// split it using a CombinedFile input format
DummyInputFormat inFormat = new DummyInputFormat();
Job job = Job.getInstance(conf);
@@ -350,13 +412,13 @@ public class TestCombineFileInputFormat {
assertEquals(0, fileSplit.getOffset(1));
assertEquals(BLOCKSIZE, fileSplit.getLength(1));
assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-
+
dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
dfs.waitActive();
// create file on two datanodes.
Path file2 = new Path(dir2 + "/file2");
- writeFile(conf, file2, (short)2, 2);
+ writeFile(conf, file2, (short) 2, 2);
// split it using a CombinedFile input format
inFormat = new DummyInputFormat();
@@ -365,34 +427,67 @@ public class TestCombineFileInputFormat {
splits = inFormat.getSplits(job);
System.out.println("Made splits(Test1): " + splits.size());
- // make sure that each split has different locations
for (InputSplit split : splits) {
System.out.println("File split(Test1): " + split);
}
- assertEquals(2, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ /**
+ * If rack1 is processed first by
+ * {@link CombineFileInputFormat#createSplits},
+ * create only one split on rack1. Otherwise create two splits.
+ */
+ if (splits.size() == 2) {
+ // first split is on rack2, contains file2
+ if (split.equals(splits.get(0))) {
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+ }
+ // second split is on rack1, contains file1
+ if (split.equals(splits.get(1))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 1) {
+ // first split is on rack1, contains file1 and file2.
+ assertEquals(3, fileSplit.getNumPaths());
+ Set<Split> expected = new HashSet<>();
+ expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
+ List<Split> actual = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ assertTrue(actual.containsAll(expected));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ } else {
+ fail("Expected split size is 1 or 2, but actual size is "
+ + splits.size());
+ }
+ }
// create another file on 3 datanodes and 3 racks.
dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
dfs.waitActive();
Path file3 = new Path(dir3 + "/file3");
- writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3);
+ writeFile(conf, new Path(dir3 + "/file3"), (short) 3, 3);
inFormat = new DummyInputFormat();
FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
inFormat.setMinSplitSizeRack(BLOCKSIZE);
@@ -400,37 +495,98 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test2): " + split);
}
- assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(3, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file3.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(file3.getName(), fileSplit.getPath(2).getName());
- assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
- assertEquals(BLOCKSIZE, fileSplit.getLength(2));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ Set<Split> expected = new HashSet<>();
+ expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
+ expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
+ expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+ List<Split> actual = new ArrayList<>();
+
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ /**
+ * If rack1 is processed first by
+ * {@link CombineFileInputFormat#createSplits},
+ * create only one split on rack1.
+ * If rack2 or rack3 is processed first and rack1 is processed second,
+ * create one split on rack2 or rack3 and the other split is on rack1.
+ * Otherwise create 3 splits for each rack.
+ */
+ if (splits.size() == 3) {
+ // first split is on rack3, contains file3
+ if (split.equals(splits.get(0))) {
+ assertEquals(3, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file3.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(file3.getName(), fileSplit.getPath(2).getName());
+ assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(2));
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ }
+ // second split is on rack2, contains file2
+ if (split.equals(splits.get(1))) {
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+ }
+ // third split is on rack1, contains file1
+ if (split.equals(splits.get(2))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 2) {
+ // first split is on rack2 or rack3, contains one or two files.
+ if (split.equals(splits.get(0))) {
+ assertEquals(1, fileSplit.getLocations().length);
+ if (fileSplit.getLocations()[0].equals(hosts2[0])) {
+ assertEquals(2, fileSplit.getNumPaths());
+ } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
+ assertEquals(3, fileSplit.getNumPaths());
+ } else {
+ fail("First split should be on rack2 or rack3.");
+ }
+ }
+ // second split is on rack1, contains the rest files.
+ if (split.equals(splits.get(1))) {
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 1) {
+ // first split is rack1, contains all three files.
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(6, fileSplit.getNumPaths());
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ } else {
+ fail("Split size should be 1, 2, or 3.");
+ }
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ }
+
+ assertEquals(6, actual.size());
+ assertTrue(actual.containsAll(expected));
// create file4 on all three racks
Path file4 = new Path(dir4 + "/file4");
@@ -442,37 +598,85 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test3): " + split);
}
- assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(6, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file3.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(file3.getName(), fileSplit.getPath(2).getName());
- assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
- assertEquals(BLOCKSIZE, fileSplit.getLength(2));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
+ expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+ actual.clear();
+
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ /**
+ * If rack1 is processed first by
+ * {@link CombineFileInputFormat#createSplits},
+ * create only one split on rack1.
+ * If rack2 or rack3 is processed first and rack1 is processed second,
+ * create one split on rack2 or rack3 and the other split is on rack1.
+ * Otherwise create 3 splits for each rack.
+ */
+ if (splits.size() == 3) {
+ // first split is on rack3, contains file3 and file4
+ if (split.equals(splits.get(0))) {
+ assertEquals(6, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ }
+ // second split is on rack2, contains file2
+ if (split.equals(splits.get(1))) {
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+ }
+ // third split is on rack1, contains file1
+ if (split.equals(splits.get(2))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 2) {
+ // first split is on rack2 or rack3, contains two or three files.
+ if (split.equals(splits.get(0))) {
+ assertEquals(1, fileSplit.getLocations().length);
+ if (fileSplit.getLocations()[0].equals(hosts2[0])) {
+ assertEquals(5, fileSplit.getNumPaths());
+ } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
+ assertEquals(6, fileSplit.getNumPaths());
+ } else {
+ fail("First split should be on rack2 or rack3.");
+ }
+ }
+ // second split is on rack1, contains the rest files.
+ if (split.equals(splits.get(1))) {
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 1) {
+ // first split is rack1, contains all four files.
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(9, fileSplit.getNumPaths());
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ } else {
+ fail("Split size should be 1, 2, or 3.");
+ }
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ }
+
+ assertEquals(9, actual.size());
+ assertTrue(actual.containsAll(expected));
// maximum split size is 2 blocks
inFormat = new DummyInputFormat();
@@ -485,34 +689,26 @@ public class TestCombineFileInputFormat {
System.out.println("File split(Test4): " + split);
}
assertEquals(5, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file3.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file3.getName(), fileSplit.getPath(1).getName());
- assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
+
+ actual.clear();
+ reset(mockList);
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ mockList.add(fileSplit.getLocations()[0]);
+ }
+
+ assertEquals(9, actual.size());
+ assertTrue(actual.containsAll(expected));
+ // verify the splits are on all the racks
+ verify(mockList, atLeastOnce()).add(hosts1[0]);
+ verify(mockList, atLeastOnce()).add(hosts2[0]);
+ verify(mockList, atLeastOnce()).add(hosts3[0]);
// maximum split size is 3 blocks
inFormat = new DummyInputFormat();
@@ -524,44 +720,26 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test5): " + split);
}
+
assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(3, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file3.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(file3.getName(), fileSplit.getPath(2).getName());
- assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
- assertEquals(BLOCKSIZE, fileSplit.getLength(2));
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(file4.getName(), fileSplit.getPath(2).getName());
- assertEquals(0, fileSplit.getOffset(2));
- assertEquals(BLOCKSIZE, fileSplit.getLength(2));
- assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(3, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(file4.getName(), fileSplit.getPath(2).getName());
- assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2));
- assertEquals(BLOCKSIZE, fileSplit.getLength(2));
- assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
+
+ actual.clear();
+ reset(mockList);
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ mockList.add(fileSplit.getLocations()[0]);
+ }
+
+ assertEquals(9, actual.size());
+ assertTrue(actual.containsAll(expected));
+ verify(mockList, atLeastOnce()).add(hosts1[0]);
+ verify(mockList, atLeastOnce()).add(hosts2[0]);
// maximum split size is 4 blocks
inFormat = new DummyInputFormat();
@@ -572,41 +750,23 @@ public class TestCombineFileInputFormat {
System.out.println("File split(Test6): " + split);
}
assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(4, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file3.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(file3.getName(), fileSplit.getPath(2).getName());
- assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
- assertEquals(BLOCKSIZE, fileSplit.getLength(2));
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(4, fileSplit.getNumPaths());
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
- assertEquals(BLOCKSIZE, fileSplit.getLength(1));
- assertEquals(file4.getName(), fileSplit.getPath(2).getName());
- assertEquals(BLOCKSIZE, fileSplit.getOffset(2));
- assertEquals(BLOCKSIZE, fileSplit.getLength(2));
- assertEquals(file4.getName(), fileSplit.getPath(3).getName());
- assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(3));
- assertEquals(BLOCKSIZE, fileSplit.getLength(3));
- assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(BLOCKSIZE, fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ actual.clear();
+ reset(mockList);
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ mockList.add(fileSplit.getLocations()[0]);
+ }
+
+ assertEquals(9, actual.size());
+ assertTrue(actual.containsAll(expected));
+ verify(mockList, atLeastOnce()).add(hosts1[0]);
// maximum split size is 7 blocks and min is 3 blocks
inFormat = new DummyInputFormat();
@@ -619,20 +779,31 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test7): " + split);
}
+
assertEquals(2, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(6, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(3, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
+
+ actual.clear();
+ reset(mockList);
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ mockList.add(fileSplit.getLocations()[0]);
+ }
+
+ assertEquals(9, actual.size());
+ assertTrue(actual.containsAll(expected));
+ verify(mockList, atLeastOnce()).add(hosts1[0]);
// Rack 1 has file1, file2 and file3 and file4
// Rack 2 has file2 and file3 and file4
// Rack 3 has file3 and file4
- // setup a filter so that only file1 and file2 can be combined
+ // setup a filter so that only (file1 and file2) or (file3 and file4)
+ // can be combined
inFormat = new DummyInputFormat();
FileInputFormat.addInputPath(job, inDir);
inFormat.setMinSplitSizeRack(1); // everything is at least rack local
@@ -642,19 +813,101 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test1): " + split);
}
- assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(6, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ if (splits.size() == 2) {
+ // first split is on rack1, contains file1 and file2.
+ if (split.equals(splits.get(0))) {
+ assertEquals(3, fileSplit.getNumPaths());
+ expected.clear();
+ expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
+ actual.clear();
+ for (int i = 0; i < 3; i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ assertTrue(actual.containsAll(expected));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ if (split.equals(splits.get(1))) {
+ // second split contains the file3 and file4, however,
+ // the locations is undetermined.
+ assertEquals(6, fileSplit.getNumPaths());
+ expected.clear();
+ expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
+ expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+ expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
+ expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+ actual.clear();
+ for (int i = 0; i < 6; i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ assertTrue(actual.containsAll(expected));
+ assertEquals(1, fileSplit.getLocations().length);
+ }
+ } else if (splits.size() == 3) {
+ if (split.equals(splits.get(0))) {
+ // first split is on rack2, contains file2
+ assertEquals(2, fileSplit.getNumPaths());
+ expected.clear();
+ expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
+ actual.clear();
+ for (int i = 0; i < 2; i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ assertTrue(actual.containsAll(expected));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+ }
+ if (split.equals(splits.get(1))) {
+ // second split is on rack1, contains file1
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ if (split.equals(splits.get(2))) {
+ // third split contains file3 and file4, however,
+ // the locations is undetermined.
+ assertEquals(6, fileSplit.getNumPaths());
+ expected.clear();
+ expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
+ expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+ expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
+ expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
+ expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+ actual.clear();
+ for (int i = 0; i < 6; i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ assertTrue(actual.containsAll(expected));
+ assertEquals(1, fileSplit.getLocations().length);
+ }
+ } else {
+ fail("Split size should be 2 or 3.");
+ }
+ }
// measure performance when there are multiple pools and
// many files in each pool.
@@ -844,7 +1097,14 @@ public class TestCombineFileInputFormat {
assertEquals(3, nodeSplits.count(locations[1]));
}
+ /**
+ * The test suppresses unchecked warnings in
+ * {@link org.mockito.Mockito#reset}. Although calling the method is
+ * a bad manner, we call the method instead of splitting the test
+ * (i.e. restarting MiniDFSCluster) to save time.
+ */
@Test
+ @SuppressWarnings("unchecked")
public void testSplitPlacementForCompressedFiles() throws Exception {
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
@@ -915,21 +1175,55 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test1): " + split);
}
- assertEquals(2, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ Set<Split> expected = new HashSet<>();
+ expected.add(new Split(file1.getName(), f1.getLen(), 0));
+ expected.add(new Split(file2.getName(), f2.getLen(), 0));
+ List<Split> actual = new ArrayList<>();
+
+ /**
+ * If rack1 is processed first by
+ * {@link CombineFileInputFormat#createSplits},
+ * create only one split on rack1. Otherwise create two splits.
+ */
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ if (splits.size() == 2) {
+ if (split.equals(splits.get(0))) {
+ // first split is on rack2, contains file2.
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+ }
+ if (split.equals(splits.get(1))) {
+ // second split is on rack1, contains file1.
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 1) {
+ // first split is on rack1, contains file1 and file2.
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ } else {
+ fail("Split size should be 1 or 2.");
+ }
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ }
+ assertEquals(2, actual.size());
+ assertTrue(actual.containsAll(expected));
// create another file on 3 datanodes and 3 racks.
dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
@@ -943,28 +1237,83 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test2): " + split);
}
- assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ expected.add(new Split(file3.getName(), f3.getLen(), 0));
+ actual.clear();
+
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ /**
+ * If rack1 is processed first by
+ * {@link CombineFileInputFormat#createSplits},
+ * create only one split on rack1.
+ * If rack2 or rack3 is processed first and rack1 is processed second,
+ * create one split on rack2 or rack3 and the other split is on rack1.
+ * Otherwise create 3 splits for each rack.
+ */
+ if (splits.size() == 3) {
+ // first split is on rack3, contains file3
+ if (split.equals(splits.get(0))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+ assertEquals(f3.getLen(), fileSplit.getLength(0));
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ }
+ // second split is on rack2, contains file2
+ if (split.equals(splits.get(1))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+ }
+ // third split is on rack1, contains file1
+ if (split.equals(splits.get(2))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 2) {
+ // first split is on rack2 or rack3, contains one or two files.
+ if (split.equals(splits.get(0))) {
+ assertEquals(1, fileSplit.getLocations().length);
+ if (fileSplit.getLocations()[0].equals(hosts2[0])) {
+ assertEquals(2, fileSplit.getNumPaths());
+ } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
+ assertEquals(1, fileSplit.getNumPaths());
+ } else {
+ fail("First split should be on rack2 or rack3.");
+ }
+ }
+ // second split is on rack1, contains the rest files.
+ if (split.equals(splits.get(1))) {
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 1) {
+ // first split is rack1, contains all three files.
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(3, fileSplit.getNumPaths());
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ } else {
+ fail("Split size should be 1, 2, or 3.");
+ }
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ }
+
+ assertEquals(3, actual.size());
+ assertTrue(actual.containsAll(expected));
// create file4 on all three racks
Path file4 = new Path(dir4 + "/file4.gz");
@@ -977,31 +1326,79 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test3): " + split);
}
- assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
- assertEquals(f4.getLen(), fileSplit.getLength(1));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ expected.add(new Split(file3.getName(), f3.getLen(), 0));
+ actual.clear();
+
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ /**
+ * If rack1 is processed first by
+ * {@link CombineFileInputFormat#createSplits},
+ * create only one split on rack1.
+ * If rack2 or rack3 is processed first and rack1 is processed second,
+ * create one split on rack2 or rack3 and the other split is on rack1.
+ * Otherwise create 3 splits for each rack.
+ */
+ if (splits.size() == 3) {
+ // first split is on rack3, contains file3 and file4
+ if (split.equals(splits.get(0))) {
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ }
+ // second split is on rack2, contains file2
+ if (split.equals(splits.get(1))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+ assertEquals(f2.getLen(), fileSplit.getLength(0));
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+ }
+ // third split is on rack1, contains file1
+ if (split.equals(splits.get(2))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+ assertEquals(f1.getLen(), fileSplit.getLength(0));
+ assertEquals(0, fileSplit.getOffset(0));
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 2) {
+ // first split is on rack2 or rack3, contains two or three files.
+ if (split.equals(splits.get(0))) {
+ assertEquals(1, fileSplit.getLocations().length);
+ if (fileSplit.getLocations()[0].equals(hosts2[0])) {
+ assertEquals(3, fileSplit.getNumPaths());
+ } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
+ assertEquals(2, fileSplit.getNumPaths());
+ } else {
+ fail("First split should be on rack2 or rack3.");
+ }
+ }
+ // second split is on rack1, contains the rest files.
+ if (split.equals(splits.get(1))) {
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 1) {
+ // first split is rack1, contains all four files.
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(4, fileSplit.getNumPaths());
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ } else {
+ fail("Split size should be 1, 2, or 3.");
+ }
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ }
+
+ assertEquals(4, actual.size());
+ assertTrue(actual.containsAll(expected));
// maximum split size is file1's length
inFormat = new DummyInputFormat();
@@ -1014,32 +1411,24 @@ public class TestCombineFileInputFormat {
System.out.println("File split(Test4): " + split);
}
assertEquals(4, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r3
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(3);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file4.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f4.getLen(), fileSplit.getLength(0));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r1
+
+ actual.clear();
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ mockList.add(fileSplit.getLocations()[0]);
+ }
+
+ assertEquals(4, actual.size());
+ assertTrue(actual.containsAll(expected));
+ verify(mockList, atLeastOnce()).add(hosts1[0]);
+ verify(mockList, atLeastOnce()).add(hosts2[0]);
+ verify(mockList, atLeastOnce()).add(hosts3[0]);
// maximum split size is twice file1's length
inFormat = new DummyInputFormat();
@@ -1051,31 +1440,33 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test5): " + split);
}
- assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
- assertEquals(f4.getLen(), fileSplit.getLength(1));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file2.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f2.getLen(), fileSplit.getLength(0));
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ actual.clear();
+ reset(mockList);
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ mockList.add(fileSplit.getLocations()[0]);
+ }
+ assertEquals(4, actual.size());
+ assertTrue(actual.containsAll(expected));
+
+ if (splits.size() == 3) {
+ // splits are on all the racks
+ verify(mockList, times(1)).add(hosts1[0]);
+ verify(mockList, times(1)).add(hosts2[0]);
+ verify(mockList, times(1)).add(hosts3[0]);
+ } else if (splits.size() == 2) {
+ // one split is on rack1, another split is on rack2 or rack3
+ verify(mockList, times(1)).add(hosts1[0]);
+ } else {
+ fail("Split size should be 2 or 3.");
+ }
// maximum split size is 4 times file1's length
inFormat = new DummyInputFormat();
@@ -1087,26 +1478,29 @@ public class TestCombineFileInputFormat {
for (InputSplit split : splits) {
System.out.println("File split(Test6): " + split);
}
- assertEquals(2, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(file3.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f3.getLen(), fileSplit.getLength(0));
- assertEquals(file4.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1));
- assertEquals(f4.getLen(), fileSplit.getLength(1));
- assertEquals(hosts3[0], fileSplit.getLocations()[0]);
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(file1.getName(), fileSplit.getPath(0).getName());
- assertEquals(0, fileSplit.getOffset(0));
- assertEquals(f1.getLen(), fileSplit.getLength(0));
- assertEquals(file2.getName(), fileSplit.getPath(1).getName());
- assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
- assertEquals(f2.getLen(), fileSplit.getLength(1));
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+ /**
+ * If rack1 is processed first by
+ * {@link CombineFileInputFormat#createSplits},
+ * create only one split on rack1. Otherwise create two splits.
+ */
+ assertTrue("Split size should be 1 or 2.",
+ splits.size() == 1 || splits.size() == 2);
+ actual.clear();
+ reset(mockList);
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ mockList.add(fileSplit.getLocations()[0]);
+ }
+ assertEquals(4, actual.size());
+ assertTrue(actual.containsAll(expected));
+ verify(mockList, times(1)).add(hosts1[0]);
// maximum split size and min-split-size per rack is 4 times file1's length
inFormat = new DummyInputFormat();
@@ -1146,25 +1540,57 @@ public class TestCombineFileInputFormat {
inFormat = new DummyInputFormat();
FileInputFormat.addInputPath(job, inDir);
inFormat.setMinSplitSizeRack(1); // everything is at least rack local
- inFormat.createPool(new TestFilter(dir1),
- new TestFilter(dir2));
+ inFormat.createPool(new TestFilter(dir1),
+ new TestFilter(dir2));
splits = inFormat.getSplits(job);
for (InputSplit split : splits) {
System.out.println("File split(Test9): " + split);
}
- assertEquals(3, splits.size());
- fileSplit = (CombineFileSplit) splits.get(0);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
- fileSplit = (CombineFileSplit) splits.get(1);
- assertEquals(1, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
- fileSplit = (CombineFileSplit) splits.get(2);
- assertEquals(2, fileSplit.getNumPaths());
- assertEquals(1, fileSplit.getLocations().length);
- assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+ actual.clear();
+ for (InputSplit split : splits) {
+ fileSplit = (CombineFileSplit) split;
+ if (splits.size() == 3) {
+ // If rack2 is processed first
+ if (split.equals(splits.get(0))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+ }
+ if (split.equals(splits.get(1))) {
+ assertEquals(1, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ if (split.equals(splits.get(2))) {
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ }
+ } else if (splits.size() == 2) {
+ // If rack1 is processed first
+ if (split.equals(splits.get(0))) {
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+ }
+ if (split.equals(splits.get(1))) {
+ assertEquals(2, fileSplit.getNumPaths());
+ assertEquals(1, fileSplit.getLocations().length);
+ assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+ }
+ } else {
+ fail("Split size should be 2 or 3.");
+ }
+ for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+ String name = fileSplit.getPath(i).getName();
+ long length = fileSplit.getLength(i);
+ long offset = fileSplit.getOffset(i);
+ actual.add(new Split(name, length, offset));
+ }
+ }
+ assertEquals(4, actual.size());
+ assertTrue(actual.containsAll(expected));
// measure performance when there are multiple pools and
// many files in each pool.