You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by oz...@apache.org on 2015/05/05 03:23:59 UTC

hadoop git commit: MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat failed on JDK8. Contributed by Akira AJISAKA.

Repository: hadoop
Updated Branches:
  refs/heads/trunk d701acc9c -> 551615fa1


MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat failed on JDK8. Contributed by Akira AJISAKA.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/551615fa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/551615fa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/551615fa

Branch: refs/heads/trunk
Commit: 551615fa13f65ae996bae9c1bacff189539b6557
Parents: d701acc
Author: Tsuyoshi Ozawa <oz...@apache.org>
Authored: Tue May 5 10:23:13 2015 +0900
Committer: Tsuyoshi Ozawa <oz...@apache.org>
Committed: Tue May 5 10:23:13 2015 +0900

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |    3 +
 .../lib/input/CombineFileInputFormat.java       |   26 +-
 .../lib/input/TestCombineFileInputFormat.java   | 1138 ++++++++++++------
 3 files changed, 805 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/551615fa/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 481757a..002fbe6 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -368,6 +368,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge
     (Gera Shegalov via jlowe)
 
+    MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat failed on JDK8.
+    (Akira AJISAKA via ozawa)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/551615fa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
index 040c54b..b2b7656 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.Set;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -289,6 +288,26 @@ public abstract class CombineFileInputFormat<K, V>
                  maxSize, minSizeNode, minSizeRack, splits);
   }
 
+  /**
+   * Process all the nodes and create splits that are local to a node.
+   * Generate one split per node iteration, and walk over nodes multiple times
+   * to distribute the splits across nodes.
+   * <p>
+   * Note: The order of processing the nodes is undetermined because the
+   * implementation of nodeToBlocks is {@link java.util.HashMap} and its order
+   * of the entries is undetermined.
+   * @param nodeToBlocks Mapping from a node to the list of blocks that
+   *                     it contains.
+   * @param blockToNodes Mapping from a block to the nodes on which
+   *                     it has replicas.
+   * @param rackToBlocks Mapping from a rack name to the list of blocks it has.
+   * @param totLength Total length of the input files.
+   * @param maxSize Max size of each split.
+   *                If set to 0, disable smoothing load.
+   * @param minSizeNode Minimum split size per node.
+   * @param minSizeRack Minimum split size per rack.
+   * @param splits New splits created by this method are added to the list.
+   */
   @VisibleForTesting
   void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
                      Map<OneBlockInfo, String[]> blockToNodes,
@@ -309,11 +328,6 @@ public abstract class CombineFileInputFormat<K, V>
     Set<String> completedNodes = new HashSet<String>();
     
     while(true) {
-      // it is allowed for maxSize to be 0. Disable smoothing load for such cases
-
-      // process all nodes and create splits that are local to a node. Generate
-      // one split per node iteration, and walk over nodes multiple times to
-      // distribute the splits across nodes. 
       for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
           .entrySet().iterator(); iter.hasNext();) {
         Map.Entry<String, Set<OneBlockInfo>> one = iter.next();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/551615fa/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
index 85c675c..b49f2d8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -53,13 +54,22 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 import com.google.common.collect.HashMultiset;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
 
 public class TestCombineFileInputFormat {
 
@@ -92,6 +102,14 @@ public class TestCombineFileInputFormat {
   static final int BLOCKSIZE = 1024;
   static final byte[] databuf = new byte[BLOCKSIZE];
 
+  @Mock
+  private List<String> mockList;
+
+  @Before
+  public void initMocks() {
+    MockitoAnnotations.initMocks(this);
+  }
+
   private static final String DUMMY_FS_URI = "dummyfs:///";
 
   /** Dummy class to extend CombineFileInputFormat*/
@@ -299,7 +317,51 @@ public class TestCombineFileInputFormat {
     assertFalse(rr.nextKeyValue());
   }
 
+  /**
+   * For testing each split has the expected name, length, and offset.
+   */
+  private final class Split {
+    private String name;
+    private long length;
+    private long offset;
+
+    public Split(String name, long length, long offset) {
+      this.name = name;
+      this.length = length;
+      this.offset = offset;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public long getLength() {
+      return length;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof Split) {
+        Split split = ((Split) obj);
+        return split.name.equals(name) && split.length == length
+            && split.offset == offset;
+      }
+      return false;
+    }
+  }
+
+  /**
+   * The test suppresses unchecked warnings in
+   * {@link org.mockito.Mockito#reset}. Although calling the method is
+   * a bad manner, we call the method instead of splitting the test
+   * (i.e. restarting MiniDFSCluster) to save time.
+   */
   @Test
+  @SuppressWarnings("unchecked")
   public void testSplitPlacement() throws Exception {
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;
@@ -326,10 +388,10 @@ public class TestCombineFileInputFormat {
         throw new IOException("Mkdirs failed to create " + inDir.toString());
       }
       Path file1 = new Path(dir1 + "/file1");
-      writeFile(conf, file1, (short)1, 1);
+      writeFile(conf, file1, (short) 1, 1);
       // create another file on the same datanode
       Path file5 = new Path(dir5 + "/file5");
-      writeFile(conf, file5, (short)1, 1);
+      writeFile(conf, file5, (short) 1, 1);
       // split it using a CombinedFile input format
       DummyInputFormat inFormat = new DummyInputFormat();
       Job job = Job.getInstance(conf);
@@ -350,13 +412,13 @@ public class TestCombineFileInputFormat {
       assertEquals(0, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
       assertEquals(hosts1[0], fileSplit.getLocations()[0]);
-      
+
       dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
       dfs.waitActive();
 
       // create file on two datanodes.
       Path file2 = new Path(dir2 + "/file2");
-      writeFile(conf, file2, (short)2, 2);
+      writeFile(conf, file2, (short) 2, 2);
 
       // split it using a CombinedFile input format
       inFormat = new DummyInputFormat();
@@ -365,34 +427,67 @@ public class TestCombineFileInputFormat {
       splits = inFormat.getSplits(job);
       System.out.println("Made splits(Test1): " + splits.size());
 
-      // make sure that each split has different locations
       for (InputSplit split : splits) {
         System.out.println("File split(Test1): " + split);
       }
-      assertEquals(2, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        /**
+         * If rack1 is processed first by
+         * {@link CombineFileInputFormat#createSplits},
+         * create only one split on rack1. Otherwise create two splits.
+         */
+        if (splits.size() == 2) {
+          // first split is on rack2, contains file2
+          if (split.equals(splits.get(0))) {
+            assertEquals(2, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+            assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+            assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+          }
+          // second split is on rack1, contains file1
+          if (split.equals(splits.get(1))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 1) {
+          // first split is on rack1, contains file1 and file2.
+          assertEquals(3, fileSplit.getNumPaths());
+          Set<Split> expected = new HashSet<>();
+          expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
+          expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
+          expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
+          List<Split> actual = new ArrayList<>();
+          for (int i = 0; i < 3; i++) {
+            String name = fileSplit.getPath(i).getName();
+            long length = fileSplit.getLength(i);
+            long offset = fileSplit.getOffset(i);
+            actual.add(new Split(name, length, offset));
+          }
+          assertTrue(actual.containsAll(expected));
+          assertEquals(1, fileSplit.getLocations().length);
+          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+        } else {
+          fail("Expected split size is 1 or 2, but actual size is "
+              + splits.size());
+        }
+      }
 
       // create another file on 3 datanodes and 3 racks.
       dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
       dfs.waitActive();
       Path file3 = new Path(dir3 + "/file3");
-      writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3);
+      writeFile(conf, new Path(dir3 + "/file3"), (short) 3, 3);
       inFormat = new DummyInputFormat();
       FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3);
       inFormat.setMinSplitSizeRack(BLOCKSIZE);
@@ -400,37 +495,98 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test2): " + split);
       }
-      assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(3, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(file3.getName(), fileSplit.getPath(2).getName());
-      assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      Set<Split> expected = new HashSet<>();
+      expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
+      expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
+      expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
+      expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
+      expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
+      expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+      List<Split> actual = new ArrayList<>();
+
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        /**
+         * If rack1 is processed first by
+         * {@link CombineFileInputFormat#createSplits},
+         * create only one split on rack1.
+         * If rack2 or rack3 is processed first and rack1 is processed second,
+         * create one split on rack2 or rack3 and the other split is on rack1.
+         * Otherwise create 3 splits for each rack.
+         */
+        if (splits.size() == 3) {
+          // first split is on rack3, contains file3
+          if (split.equals(splits.get(0))) {
+            assertEquals(3, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+            assertEquals(file3.getName(), fileSplit.getPath(1).getName());
+            assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+            assertEquals(file3.getName(), fileSplit.getPath(2).getName());
+            assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(2));
+            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+          }
+          // second split is on rack2, contains file2
+          if (split.equals(splits.get(1))) {
+            assertEquals(2, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+            assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+            assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+          }
+          // third split is on rack1, contains file1
+          if (split.equals(splits.get(2))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 2) {
+          // first split is on rack2 or rack3, contains one or two files.
+          if (split.equals(splits.get(0))) {
+            assertEquals(1, fileSplit.getLocations().length);
+            if (fileSplit.getLocations()[0].equals(hosts2[0])) {
+              assertEquals(2, fileSplit.getNumPaths());
+            } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
+              assertEquals(3, fileSplit.getNumPaths());
+            } else {
+              fail("First split should be on rack2 or rack3.");
+            }
+          }
+          // second split is on rack1, contains the rest files.
+          if (split.equals(splits.get(1))) {
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 1) {
+          // first split is rack1, contains all three files.
+          assertEquals(1, fileSplit.getLocations().length);
+          assertEquals(6, fileSplit.getNumPaths());
+          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+        } else {
+          fail("Split size should be 1, 2, or 3.");
+        }
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+      }
+
+      assertEquals(6, actual.size());
+      assertTrue(actual.containsAll(expected));
 
       // create file4 on all three racks
       Path file4 = new Path(dir4 + "/file4");
@@ -442,37 +598,85 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test3): " + split);
       }
-      assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(6, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(file3.getName(), fileSplit.getPath(2).getName());
-      assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
+      expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
+      expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+      actual.clear();
+
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        /**
+         * If rack1 is processed first by
+         * {@link CombineFileInputFormat#createSplits},
+         * create only one split on rack1.
+         * If rack2 or rack3 is processed first and rack1 is processed second,
+         * create one split on rack2 or rack3 and the other split is on rack1.
+         * Otherwise create 3 splits for each rack.
+         */
+        if (splits.size() == 3) {
+          // first split is on rack3, contains file3 and file4
+          if (split.equals(splits.get(0))) {
+            assertEquals(6, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+          }
+          // second split is on rack2, contains file2
+          if (split.equals(splits.get(1))) {
+            assertEquals(2, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+            assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+            assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(1));
+            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+          }
+          // third split is on rack1, contains file1
+          if (split.equals(splits.get(2))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 2) {
+          // first split is on rack2 or rack3, contains two or three files.
+          if (split.equals(splits.get(0))) {
+            assertEquals(1, fileSplit.getLocations().length);
+            if (fileSplit.getLocations()[0].equals(hosts2[0])) {
+              assertEquals(5, fileSplit.getNumPaths());
+            } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
+              assertEquals(6, fileSplit.getNumPaths());
+            } else {
+              fail("First split should be on rack2 or rack3.");
+            }
+          }
+          // second split is on rack1, contains the rest files.
+          if (split.equals(splits.get(1))) {
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 1) {
+          // first split is rack1, contains all four files.
+          assertEquals(1, fileSplit.getLocations().length);
+          assertEquals(9, fileSplit.getNumPaths());
+          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+        } else {
+          fail("Split size should be 1, 2, or 3.");
+        }
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+      }
+
+      assertEquals(9, actual.size());
+      assertTrue(actual.containsAll(expected));
 
       // maximum split size is 2 blocks 
       inFormat = new DummyInputFormat();
@@ -485,34 +689,26 @@ public class TestCombineFileInputFormat {
         System.out.println("File split(Test4): " + split);
       }
       assertEquals(5, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
-      assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
+
+      actual.clear();
+      reset(mockList);
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+        mockList.add(fileSplit.getLocations()[0]);
+      }
+
+      assertEquals(9, actual.size());
+      assertTrue(actual.containsAll(expected));
+      // verify the splits are on all the racks
+      verify(mockList, atLeastOnce()).add(hosts1[0]);
+      verify(mockList, atLeastOnce()).add(hosts2[0]);
+      verify(mockList, atLeastOnce()).add(hosts3[0]);
 
       // maximum split size is 3 blocks 
       inFormat = new DummyInputFormat();
@@ -524,44 +720,26 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test5): " + split);
       }
+
       assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(3, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(file3.getName(), fileSplit.getPath(2).getName());
-      assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(file4.getName(), fileSplit.getPath(2).getName());
-      assertEquals(0, fileSplit.getOffset(2));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(3, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(file4.getName(), fileSplit.getPath(2).getName());
-      assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
+
+      actual.clear();
+      reset(mockList);
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+        mockList.add(fileSplit.getLocations()[0]);
+      }
+
+      assertEquals(9, actual.size());
+      assertTrue(actual.containsAll(expected));
+      verify(mockList, atLeastOnce()).add(hosts1[0]);
+      verify(mockList, atLeastOnce()).add(hosts2[0]);
 
       // maximum split size is 4 blocks 
       inFormat = new DummyInputFormat();
@@ -572,41 +750,23 @@ public class TestCombineFileInputFormat {
         System.out.println("File split(Test6): " + split);
       }
       assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(4, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(file3.getName(), fileSplit.getPath(2).getName());
-      assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(4, fileSplit.getNumPaths());
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals(file4.getName(), fileSplit.getPath(2).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(2));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals(file4.getName(), fileSplit.getPath(3).getName());
-      assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(3));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(3));
-      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      actual.clear();
+      reset(mockList);
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+        mockList.add(fileSplit.getLocations()[0]);
+      }
+
+      assertEquals(9, actual.size());
+      assertTrue(actual.containsAll(expected));
+      verify(mockList, atLeastOnce()).add(hosts1[0]);
 
       // maximum split size is 7 blocks and min is 3 blocks
       inFormat = new DummyInputFormat();
@@ -619,20 +779,31 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test7): " + split);
       }
+
       assertEquals(2, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(6, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(3, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
+
+      actual.clear();
+      reset(mockList);
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+        mockList.add(fileSplit.getLocations()[0]);
+      }
+
+      assertEquals(9, actual.size());
+      assertTrue(actual.containsAll(expected));
+      verify(mockList, atLeastOnce()).add(hosts1[0]);
 
       // Rack 1 has file1, file2 and file3 and file4
       // Rack 2 has file2 and file3 and file4
       // Rack 3 has file3 and file4
-      // setup a filter so that only file1 and file2 can be combined
+      // setup a filter so that only (file1 and file2) or (file3 and file4)
+      // can be combined
       inFormat = new DummyInputFormat();
       FileInputFormat.addInputPath(job, inDir);
       inFormat.setMinSplitSizeRack(1); // everything is at least rack local
@@ -642,19 +813,101 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test1): " + split);
       }
-      assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(6, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        if (splits.size() == 2) {
+          // first split is on rack1, contains file1 and file2.
+          if (split.equals(splits.get(0))) {
+            assertEquals(3, fileSplit.getNumPaths());
+            expected.clear();
+            expected.add(new Split(file1.getName(), BLOCKSIZE, 0));
+            expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
+            expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
+            actual.clear();
+            for (int i = 0; i < 3; i++) {
+              String name = fileSplit.getPath(i).getName();
+              long length = fileSplit.getLength(i);
+              long offset = fileSplit.getOffset(i);
+              actual.add(new Split(name, length, offset));
+            }
+            assertTrue(actual.containsAll(expected));
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+          if (split.equals(splits.get(1))) {
+            // second split contains the file3 and file4, however,
+            // the locations is undetermined.
+            assertEquals(6, fileSplit.getNumPaths());
+            expected.clear();
+            expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
+            expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
+            expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+            expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
+            expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
+            expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+            actual.clear();
+            for (int i = 0; i < 6; i++) {
+              String name = fileSplit.getPath(i).getName();
+              long length = fileSplit.getLength(i);
+              long offset = fileSplit.getOffset(i);
+              actual.add(new Split(name, length, offset));
+            }
+            assertTrue(actual.containsAll(expected));
+            assertEquals(1, fileSplit.getLocations().length);
+          }
+        } else if (splits.size() == 3) {
+          if (split.equals(splits.get(0))) {
+            // first split is on rack2, contains file2
+            assertEquals(2, fileSplit.getNumPaths());
+            expected.clear();
+            expected.add(new Split(file2.getName(), BLOCKSIZE, 0));
+            expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE));
+            actual.clear();
+            for (int i = 0; i < 2; i++) {
+              String name = fileSplit.getPath(i).getName();
+              long length = fileSplit.getLength(i);
+              long offset = fileSplit.getOffset(i);
+              actual.add(new Split(name, length, offset));
+            }
+            assertTrue(actual.containsAll(expected));
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+          }
+          if (split.equals(splits.get(1))) {
+            // second split is on rack1, contains file1
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+            assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+          if (split.equals(splits.get(2))) {
+            // third split contains file3 and file4, however,
+            // the locations is undetermined.
+            assertEquals(6, fileSplit.getNumPaths());
+            expected.clear();
+            expected.add(new Split(file3.getName(), BLOCKSIZE, 0));
+            expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE));
+            expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+            expected.add(new Split(file4.getName(), BLOCKSIZE, 0));
+            expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE));
+            expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2));
+            actual.clear();
+            for (int i = 0; i < 6; i++) {
+              String name = fileSplit.getPath(i).getName();
+              long length = fileSplit.getLength(i);
+              long offset = fileSplit.getOffset(i);
+              actual.add(new Split(name, length, offset));
+            }
+            assertTrue(actual.containsAll(expected));
+            assertEquals(1, fileSplit.getLocations().length);
+          }
+        } else {
+          fail("Split size should be 2 or 3.");
+        }
+      }
 
       // measure performance when there are multiple pools and
       // many files in each pool.
@@ -844,7 +1097,14 @@ public class TestCombineFileInputFormat {
     assertEquals(3, nodeSplits.count(locations[1]));
   }
 
+  /**
+   * The test suppresses unchecked warnings in
+   * {@link org.mockito.Mockito#reset}. Although calling the method is
+   * a bad manner, we call the method instead of splitting the test
+   * (i.e. restarting MiniDFSCluster) to save time.
+   */
   @Test
+  @SuppressWarnings("unchecked")
   public void testSplitPlacementForCompressedFiles() throws Exception {
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;
@@ -915,21 +1175,55 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test1): " + split);
       }
-      assertEquals(2, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      Set<Split> expected = new HashSet<>();
+      expected.add(new Split(file1.getName(), f1.getLen(), 0));
+      expected.add(new Split(file2.getName(), f2.getLen(), 0));
+      List<Split> actual = new ArrayList<>();
+
+      /**
+       * If rack1 is processed first by
+       * {@link CombineFileInputFormat#createSplits},
+       * create only one split on rack1. Otherwise create two splits.
+       */
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        if (splits.size() == 2) {
+          if (split.equals(splits.get(0))) {
+            // first split is on rack2, contains file2.
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(f2.getLen(), fileSplit.getLength(0));
+            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+          }
+          if (split.equals(splits.get(1))) {
+            // second split is on rack1, contains file1.
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(f1.getLen(), fileSplit.getLength(0));
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 1) {
+          // first split is on rack1, contains file1 and file2.
+          assertEquals(2, fileSplit.getNumPaths());
+          assertEquals(1, fileSplit.getLocations().length);
+          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+        } else {
+          fail("Split size should be 1 or 2.");
+        }
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+      }
+      assertEquals(2, actual.size());
+      assertTrue(actual.containsAll(expected));
 
       // create another file on 3 datanodes and 3 racks.
       dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
@@ -943,28 +1237,83 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test2): " + split);
       }
-      assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      expected.add(new Split(file3.getName(), f3.getLen(), 0));
+      actual.clear();
+
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        /**
+         * If rack1 is processed first by
+         * {@link CombineFileInputFormat#createSplits},
+         * create only one split on rack1.
+         * If rack2 or rack3 is processed first and rack1 is processed second,
+         * create one split on rack2 or rack3 and the other split is on rack1.
+         * Otherwise create 3 splits for each rack.
+         */
+        if (splits.size() == 3) {
+          // first split is on rack3, contains file3
+          if (split.equals(splits.get(0))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(file3.getName(), fileSplit.getPath(0).getName());
+            assertEquals(f3.getLen(), fileSplit.getLength(0));
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+          }
+          // second split is on rack2, contains file2
+          if (split.equals(splits.get(1))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+            assertEquals(f2.getLen(), fileSplit.getLength(0));
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+          }
+          // third split is on rack1, contains file1
+          if (split.equals(splits.get(2))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+            assertEquals(f1.getLen(), fileSplit.getLength(0));
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 2) {
+          // first split is on rack2 or rack3, contains one or two files.
+          if (split.equals(splits.get(0))) {
+            assertEquals(1, fileSplit.getLocations().length);
+            if (fileSplit.getLocations()[0].equals(hosts2[0])) {
+              assertEquals(2, fileSplit.getNumPaths());
+            } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
+              assertEquals(1, fileSplit.getNumPaths());
+            } else {
+              fail("First split should be on rack2 or rack3.");
+            }
+          }
+          // second split is on rack1, contains the rest files.
+          if (split.equals(splits.get(1))) {
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 1) {
+          // first split is rack1, contains all three files.
+          assertEquals(1, fileSplit.getLocations().length);
+          assertEquals(3, fileSplit.getNumPaths());
+          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+        } else {
+          fail("Split size should be 1, 2, or 3.");
+        }
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+      }
+
+      assertEquals(3, actual.size());
+      assertTrue(actual.containsAll(expected));
 
       // create file4 on all three racks
       Path file4 = new Path(dir4 + "/file4.gz");
@@ -977,31 +1326,79 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test3): " + split);
       }
-      assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
-      assertEquals(f4.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      expected.add(new Split(file3.getName(), f3.getLen(), 0));
+      actual.clear();
+
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        /**
+         * If rack1 is processed first by
+         * {@link CombineFileInputFormat#createSplits},
+         * create only one split on rack1.
+         * If rack2 or rack3 is processed first and rack1 is processed second,
+         * create one split on rack2 or rack3 and the other split is on rack1.
+         * Otherwise create 3 splits for each rack.
+         */
+        if (splits.size() == 3) {
+          // first split is on rack3, contains file3 and file4
+          if (split.equals(splits.get(0))) {
+            assertEquals(2, fileSplit.getNumPaths());
+            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+          }
+          // second split is on rack2, contains file2
+          if (split.equals(splits.get(1))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+            assertEquals(f2.getLen(), fileSplit.getLength(0));
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+          }
+          // third split is on rack1, contains file1
+          if (split.equals(splits.get(2))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+            assertEquals(f1.getLen(), fileSplit.getLength(0));
+            assertEquals(0, fileSplit.getOffset(0));
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 2) {
+          // first split is on rack2 or rack3, contains two or three files.
+          if (split.equals(splits.get(0))) {
+            assertEquals(1, fileSplit.getLocations().length);
+            if (fileSplit.getLocations()[0].equals(hosts2[0])) {
+              assertEquals(3, fileSplit.getNumPaths());
+            } else if (fileSplit.getLocations()[0].equals(hosts3[0])) {
+              assertEquals(2, fileSplit.getNumPaths());
+            } else {
+              fail("First split should be on rack2 or rack3.");
+            }
+          }
+          // second split is on rack1, contains the rest files.
+          if (split.equals(splits.get(1))) {
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 1) {
+          // first split is rack1, contains all four files.
+          assertEquals(1, fileSplit.getLocations().length);
+          assertEquals(4, fileSplit.getNumPaths());
+          assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+        } else {
+          fail("Split size should be 1, 2, or 3.");
+        }
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+      }
+
+      assertEquals(4, actual.size());
+      assertTrue(actual.containsAll(expected));
 
       // maximum split size is file1's length
       inFormat = new DummyInputFormat();
@@ -1014,32 +1411,24 @@ public class TestCombineFileInputFormat {
         System.out.println("File split(Test4): " + split);
       }
       assertEquals(4, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r3
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(3);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f4.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r1
+
+      actual.clear();
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+        mockList.add(fileSplit.getLocations()[0]);
+      }
+
+      assertEquals(4, actual.size());
+      assertTrue(actual.containsAll(expected));
+      verify(mockList, atLeastOnce()).add(hosts1[0]);
+      verify(mockList, atLeastOnce()).add(hosts2[0]);
+      verify(mockList, atLeastOnce()).add(hosts3[0]);
 
       // maximum split size is twice file1's length
       inFormat = new DummyInputFormat();
@@ -1051,31 +1440,33 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test5): " + split);
       }
-      assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
-      assertEquals(f4.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      actual.clear();
+      reset(mockList);
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+        mockList.add(fileSplit.getLocations()[0]);
+      }
+      assertEquals(4, actual.size());
+      assertTrue(actual.containsAll(expected));
+
+      if (splits.size() == 3) {
+        // splits are on all the racks
+        verify(mockList, times(1)).add(hosts1[0]);
+        verify(mockList, times(1)).add(hosts2[0]);
+        verify(mockList, times(1)).add(hosts3[0]);
+      } else if (splits.size() == 2) {
+        // one split is on rack1, another split is on rack2 or rack3
+        verify(mockList, times(1)).add(hosts1[0]);
+      } else {
+        fail("Split size should be 2 or 3.");
+      }
 
       // maximum split size is 4 times file1's length 
       inFormat = new DummyInputFormat();
@@ -1087,26 +1478,29 @@ public class TestCombineFileInputFormat {
       for (InputSplit split : splits) {
         System.out.println("File split(Test6): " + split);
       }
-      assertEquals(2, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f3.getLen(), fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
-      assertEquals(f4.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE);
-      assertEquals(f2.getLen(), fileSplit.getLength(1));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+
+      /**
+       * If rack1 is processed first by
+       * {@link CombineFileInputFormat#createSplits},
+       * create only one split on rack1. Otherwise create two splits.
+       */
+      assertTrue("Split size should be 1 or 2.",
+          splits.size() == 1 || splits.size() == 2);
+      actual.clear();
+      reset(mockList);
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+        mockList.add(fileSplit.getLocations()[0]);
+      }
+      assertEquals(4, actual.size());
+      assertTrue(actual.containsAll(expected));
+      verify(mockList, times(1)).add(hosts1[0]);
 
       // maximum split size and min-split-size per rack is 4 times file1's length
       inFormat = new DummyInputFormat();
@@ -1146,25 +1540,57 @@ public class TestCombineFileInputFormat {
       inFormat = new DummyInputFormat();
       FileInputFormat.addInputPath(job, inDir);
       inFormat.setMinSplitSizeRack(1); // everything is at least rack local
-      inFormat.createPool(new TestFilter(dir1), 
-                          new TestFilter(dir2));
+      inFormat.createPool(new TestFilter(dir1),
+          new TestFilter(dir2));
       splits = inFormat.getSplits(job);
       for (InputSplit split : splits) {
         System.out.println("File split(Test9): " + split);
       }
-      assertEquals(3, splits.size());
-      fileSplit = (CombineFileSplit) splits.get(0);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
-      fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
-      fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(2, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+
+      actual.clear();
+      for (InputSplit split : splits) {
+        fileSplit = (CombineFileSplit) split;
+        if (splits.size() == 3) {
+          // If rack2 is processed first
+          if (split.equals(splits.get(0))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts2[0], fileSplit.getLocations()[0]);
+          }
+          if (split.equals(splits.get(1))) {
+            assertEquals(1, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+          if (split.equals(splits.get(2))) {
+            assertEquals(2, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+          }
+        } else if (splits.size() == 2) {
+          // If rack1 is processed first
+          if (split.equals(splits.get(0))) {
+            assertEquals(2, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts1[0], fileSplit.getLocations()[0]);
+          }
+          if (split.equals(splits.get(1))) {
+            assertEquals(2, fileSplit.getNumPaths());
+            assertEquals(1, fileSplit.getLocations().length);
+            assertEquals(hosts3[0], fileSplit.getLocations()[0]);
+          }
+        } else {
+          fail("Split size should be 2 or 3.");
+        }
+        for (int i = 0; i < fileSplit.getNumPaths(); i++) {
+          String name = fileSplit.getPath(i).getName();
+          long length = fileSplit.getLength(i);
+          long offset = fileSplit.getOffset(i);
+          actual.add(new Split(name, length, offset));
+        }
+      }
+      assertEquals(4, actual.size());
+      assertTrue(actual.containsAll(expected));
 
       // measure performance when there are multiple pools and
       // many files in each pool.