You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/07/26 20:22:26 UTC

svn commit: r1507388 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/o...

Author: jlowe
Date: Fri Jul 26 18:22:26 2013
New Revision: 1507388

URL: http://svn.apache.org/r1507388
Log:
MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus. Contributed by Hairong Kuang and Jason Lowe

Added:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java   (with props)
Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jul 26 18:22:26 2013
@@ -10,6 +10,9 @@ Release 0.23.10 - UNRELEASED
 
   OPTIMIZATIONS
 
+    MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
+    (Hairong Kuang and Jason Lowe via jlowe)
+
   BUG FIXES
 
     MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -36,8 +36,10 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
@@ -164,13 +166,17 @@ public abstract class FileInputFormat<K,
   protected void addInputPathRecursively(List<FileStatus> result,
       FileSystem fs, Path path, PathFilter inputFilter) 
       throws IOException {
-    for(FileStatus stat: fs.listStatus(path, inputFilter)) {
-      if (stat.isDirectory()) {
-        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
-      } else {
-        result.add(stat);
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+    while (iter.hasNext()) {
+      LocatedFileStatus stat = iter.next();
+      if (inputFilter.accept(stat.getPath())) {
+        if (stat.isDirectory()) {
+          addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+        } else {
+          result.add(stat);
+        }
       }
-    }          
+    }
   }
   
   /** List input directories.
@@ -216,14 +222,19 @@ public abstract class FileInputFormat<K,
       } else {
         for (FileStatus globStat: matches) {
           if (globStat.isDirectory()) {
-            for(FileStatus stat: fs.listStatus(globStat.getPath(),
-                inputFilter)) {
-              if (recursive && stat.isDirectory()) {
-                addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
-              } else {
-                result.add(stat);
+            RemoteIterator<LocatedFileStatus> iter =
+                fs.listLocatedStatus(globStat.getPath());
+            while (iter.hasNext()) {
+              LocatedFileStatus stat = iter.next();
+              if (inputFilter.accept(stat.getPath())) {
+                if (recursive && stat.isDirectory()) {
+                  addInputPathRecursively(result, fs, stat.getPath(),
+                      inputFilter);
+                } else {
+                  result.add(stat);
+                }
               }
-            }          
+            }
           } else {
             result.add(globStat);
           }
@@ -249,7 +260,6 @@ public abstract class FileInputFormat<K,
 
   /** Splits files returned by {@link #listStatus(JobConf)} when
    * they're too big.*/ 
-  @SuppressWarnings("deprecation")
   public InputSplit[] getSplits(JobConf job, int numSplits)
     throws IOException {
     FileStatus[] files = listStatus(job);
@@ -273,31 +283,38 @@ public abstract class FileInputFormat<K,
     NetworkTopology clusterMap = new NetworkTopology();
     for (FileStatus file: files) {
       Path path = file.getPath();
-      FileSystem fs = path.getFileSystem(job);
       long length = file.getLen();
-      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-      if ((length != 0) && isSplitable(fs, path)) { 
-        long blockSize = file.getBlockSize();
-        long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
-        long bytesRemaining = length;
-        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-          String[] splitHosts = getSplitHosts(blkLocations, 
-              length-bytesRemaining, splitSize, clusterMap);
-          splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
-                               splitHosts));
-          bytesRemaining -= splitSize;
+      if (length != 0) {
+        FileSystem fs = path.getFileSystem(job);
+        BlockLocation[] blkLocations;
+        if (file instanceof LocatedFileStatus) {
+          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+        } else {
+          blkLocations = fs.getFileBlockLocations(file, 0, length);
+        }
+        if (isSplitable(fs, path)) {
+          long blockSize = file.getBlockSize();
+          long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+          long bytesRemaining = length;
+          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+            String[] splitHosts = getSplitHosts(blkLocations,
+                length-bytesRemaining, splitSize, clusterMap);
+            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
+                splitHosts));
+            bytesRemaining -= splitSize;
+          }
+
+          if (bytesRemaining != 0) {
+            String[] splitHosts = getSplitHosts(blkLocations, length
+                - bytesRemaining, bytesRemaining, clusterMap);
+            splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
+                splitHosts));
+          }
+        } else {
+          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
+          splits.add(makeSplit(path, 0, length, splitHosts));
         }
-        
-        if (bytesRemaining != 0) {
-          String[] splitHosts = getSplitHosts(blkLocations, length
-              - bytesRemaining, bytesRemaining, clusterMap);
-          splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
-              splitHosts));
-        }
-      } else if (length != 0) {
-        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
-        splits.add(makeSplit(path, 0, length, splitHosts));
       } else { 
         //Create empty hosts array for zero length files
         splits.add(makeSplit(path, 0, length, new String[0]));

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.lib.
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.HashMap;
@@ -33,7 +32,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -203,47 +202,33 @@ public abstract class CombineFileInputFo
     }
 
     // all the files in input set
-    Path[] paths = FileUtil.stat2Paths(
-                     listStatus(job).toArray(new FileStatus[0]));
+    List<FileStatus> stats = listStatus(job);
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    if (paths.length == 0) {
+    if (stats.size() == 0) {
       return splits;    
     }
 
-    // Convert them to Paths first. This is a costly operation and 
-    // we should do it first, otherwise we will incur doing it multiple
-    // times, one time each for each pool in the next loop.
-    List<Path> newpaths = new LinkedList<Path>();
-    for (int i = 0; i < paths.length; i++) {
-      FileSystem fs = paths[i].getFileSystem(conf);
-      Path p = fs.makeQualified(paths[i]);
-      newpaths.add(p);
-    }
-    paths = null;
-
     // In one single iteration, process all the paths in a single pool.
     // Processing one pool at a time ensures that a split contains paths
     // from a single pool only.
     for (MultiPathFilter onepool : pools) {
-      ArrayList<Path> myPaths = new ArrayList<Path>();
+      ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
       
       // pick one input path. If it matches all the filters in a pool,
       // add it to the output set
-      for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
-        Path p = iter.next();
-        if (onepool.accept(p)) {
+      for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
+        FileStatus p = iter.next();
+        if (onepool.accept(p.getPath())) {
           myPaths.add(p); // add it to my output set
           iter.remove();
         }
       }
       // create splits for all files in this pool.
-      getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
-                    maxSize, minSizeNode, minSizeRack, splits);
+      getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
     }
 
     // create splits for all files that are not in any pool.
-    getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), 
-                  maxSize, minSizeNode, minSizeRack, splits);
+    getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
 
     // free up rackToNodes map
     rackToNodes.clear();
@@ -253,7 +238,7 @@ public abstract class CombineFileInputFo
   /**
    * Return all the splits in the specified set of paths
    */
-  private void getMoreSplits(JobContext job, Path[] paths, 
+  private void getMoreSplits(JobContext job, List<FileStatus> stats,
                              long maxSize, long minSizeNode, long minSizeRack,
                              List<InputSplit> splits)
     throws IOException {
@@ -274,18 +259,20 @@ public abstract class CombineFileInputFo
     HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
                               new HashMap<String, List<OneBlockInfo>>();
     
-    files = new OneFileInfo[paths.length];
-    if (paths.length == 0) {
+    files = new OneFileInfo[stats.size()];
+    if (stats.size() == 0) {
       return; 
     }
 
     // populate all the blocks for all files
     long totLength = 0;
-    for (int i = 0; i < paths.length; i++) {
-      files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
+    int fileIdx = 0;
+    for (FileStatus stat : stats) {
+      files[fileIdx] = new OneFileInfo(stat, conf,
+                                 isSplitable(job, stat.getPath()),
                                  rackToBlocks, blockToNodes, nodeToBlocks,
                                  rackToNodes, maxSize);
-      totLength += files[i].getLength();
+      totLength += files[fileIdx].getLength();
     }
 
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
@@ -479,7 +466,7 @@ public abstract class CombineFileInputFo
     private long fileSize;               // size of the file
     private OneBlockInfo[] blocks;       // all blocks in this file
 
-    OneFileInfo(Path path, Configuration conf,
+    OneFileInfo(FileStatus stat, Configuration conf,
                 boolean isSplitable,
                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
                 HashMap<OneBlockInfo, String[]> blockToNodes,
@@ -490,10 +477,13 @@ public abstract class CombineFileInputFo
       this.fileSize = 0;
 
       // get block locations from file system
-      FileSystem fs = path.getFileSystem(conf);
-      FileStatus stat = fs.getFileStatus(path);
-      BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
-                                                           stat.getLen());
+      BlockLocation[] locations;
+      if (stat instanceof LocatedFileStatus) {
+        locations = ((LocatedFileStatus) stat).getBlockLocations();
+      } else {
+        FileSystem fs = stat.getPath().getFileSystem(conf);
+        locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
+      }
       // create a list of all block and their locations
       if (locations == null) {
         blocks = new OneBlockInfo[0];
@@ -508,8 +498,8 @@ public abstract class CombineFileInputFo
           // full file length
           blocks = new OneBlockInfo[1];
           fileSize = stat.getLen();
-          blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
-              .getHosts(), locations[0].getTopologyPaths());
+          blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
+              locations[0].getHosts(), locations[0].getTopologyPaths());
         } else {
           ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
               locations.length);
@@ -535,9 +525,9 @@ public abstract class CombineFileInputFo
                   myLength = Math.min(maxSize, left);
                 }
               }
-              OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
-                  myLength, locations[i].getHosts(), locations[i]
-                      .getTopologyPaths());
+              OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
+                  myOffset, myLength, locations[i].getHosts(),
+                  locations[i].getTopologyPaths());
               left -= myLength;
               myOffset += myLength;
 
@@ -638,6 +628,9 @@ public abstract class CombineFileInputFo
 
   protected BlockLocation[] getFileBlockLocations(
     FileSystem fs, FileStatus stat) throws IOException {
+    if (stat instanceof LocatedFileStatus) {
+      return ((LocatedFileStatus) stat).getBlockLocations();
+    }
     return fs.getFileBlockLocations(stat, 0, stat.getLen());
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -29,9 +29,11 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -254,14 +256,19 @@ public abstract class FileInputFormat<K,
       } else {
         for (FileStatus globStat: matches) {
           if (globStat.isDirectory()) {
-            for(FileStatus stat: fs.listStatus(globStat.getPath(),
-                inputFilter)) {
-              if (recursive && stat.isDirectory()) {
-                addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
-              } else {
-                result.add(stat);
+            RemoteIterator<LocatedFileStatus> iter =
+                fs.listLocatedStatus(globStat.getPath());
+            while (iter.hasNext()) {
+              LocatedFileStatus stat = iter.next();
+              if (inputFilter.accept(stat.getPath())) {
+                if (recursive && stat.isDirectory()) {
+                  addInputPathRecursively(result, fs, stat.getPath(),
+                      inputFilter);
+                } else {
+                  result.add(stat);
+                }
               }
-            }          
+            }
           } else {
             result.add(globStat);
           }
@@ -291,13 +298,17 @@ public abstract class FileInputFormat<K,
   protected void addInputPathRecursively(List<FileStatus> result,
       FileSystem fs, Path path, PathFilter inputFilter) 
       throws IOException {
-    for(FileStatus stat: fs.listStatus(path, inputFilter)) {
-      if (stat.isDirectory()) {
-        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
-      } else {
-        result.add(stat);
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+    while (iter.hasNext()) {
+      LocatedFileStatus stat = iter.next();
+      if (inputFilter.accept(stat.getPath())) {
+        if (stat.isDirectory()) {
+          addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+        } else {
+          result.add(stat);
+        }
       }
-    }          
+    }
   }
   
   
@@ -326,8 +337,13 @@ public abstract class FileInputFormat<K,
       Path path = file.getPath();
       long length = file.getLen();
       if (length != 0) {
-        FileSystem fs = path.getFileSystem(job.getConfiguration());
-        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+        BlockLocation[] blkLocations;
+        if (file instanceof LocatedFileStatus) {
+          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+        } else {
+          FileSystem fs = path.getFileSystem(job.getConfiguration());
+          blkLocations = fs.getFileBlockLocations(file, 0, length);
+        }
         if (isSplitable(job, path)) {
           long blockSize = file.getBlockSize();
           long splitSize = computeSplitSize(blockSize, minSize, maxSize);

Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1507388&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFileInputFormat {
+
+  @Test
+  public void testListLocatedStatus() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.setBoolean("fs.test.impl.disable.cache", false);
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        "test:///a1/a2");
+    MockFileSystem mockFs =
+        (MockFileSystem) new Path("test:///").getFileSystem(conf);
+    Assert.assertEquals("listLocatedStatus already called",
+        0, mockFs.numListLocatedStatusCalls);
+    JobConf job = new JobConf(conf);
+    TextInputFormat fileInputFormat = new TextInputFormat();
+    fileInputFormat.configure(job);
+    InputSplit[] splits = fileInputFormat.getSplits(job, 1);
+    Assert.assertEquals("Input splits are not correct", 2, splits.length);
+    Assert.assertEquals("listLocatedStatuss calls",
+        1, mockFs.numListLocatedStatusCalls);
+  }
+
+  private Configuration getConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set("fs.test.impl.disable.cache", "true");
+    conf.setClass("fs.test.impl", MockFileSystem.class, FileSystem.class);
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+        "test:///a1");
+    return conf;
+  }
+
+  static class MockFileSystem extends RawLocalFileSystem {
+    int numListLocatedStatusCalls = 0;
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+        IOException {
+      if (f.toString().equals("test:/a1")) {
+        return new FileStatus[] {
+            new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")),
+            new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) };
+      } else if (f.toString().equals("test:/a1/a2")) {
+        return new FileStatus[] {
+            new FileStatus(10, false, 1, 150, 150,
+                new Path("test:/a1/a2/file2")),
+            new FileStatus(10, false, 1, 151, 150,
+                new Path("test:/a1/a2/file3")) };
+      }
+      return new FileStatus[0];
+    }
+
+    @Override
+    public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+        throws IOException {
+      return new FileStatus[] { new FileStatus(10, true, 1, 150, 150,
+          pathPattern) };
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f, PathFilter filter)
+        throws FileNotFoundException, IOException {
+      return this.listStatus(f);
+    }
+
+    @Override
+    public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
+        throws IOException {
+      return new BlockLocation[] {
+          new BlockLocation(new String[] { "localhost:50010" },
+              new String[] { "localhost" }, 0, len) };
+    }
+
+    @Override
+    protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
+        PathFilter filter) throws FileNotFoundException, IOException {
+      ++numListLocatedStatusCalls;
+      return super.listLocatedStatus(f, filter);
+    }
+  }
+}

Propchange: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -24,11 +24,14 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.junit.Test;
@@ -77,6 +80,23 @@ public class TestFileInputFormat {
         .toString());
   }
 
+  @Test
+  public void testListLocatedStatus() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.setBoolean("fs.test.impl.disable.cache", false);
+    conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
+    MockFileSystem mockFs =
+        (MockFileSystem) new Path("test:///").getFileSystem(conf);
+    Assert.assertEquals("listLocatedStatus already called",
+        0, mockFs.numListLocatedStatusCalls);
+    Job job = Job.getInstance(conf);
+    FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+    List<InputSplit> splits = fileInputFormat.getSplits(job);
+    Assert.assertEquals("Input splits are not correct", 2, splits.size());
+    Assert.assertEquals("listLocatedStatuss calls",
+        1, mockFs.numListLocatedStatusCalls);
+  }
+
   private Configuration getConfiguration() {
     Configuration conf = new Configuration();
     conf.set("fs.test.impl.disable.cache", "true");
@@ -86,13 +106,14 @@ public class TestFileInputFormat {
   }
 
   static class MockFileSystem extends RawLocalFileSystem {
+    int numListLocatedStatusCalls = 0;
 
     @Override
     public FileStatus[] listStatus(Path f) throws FileNotFoundException,
         IOException {
       if (f.toString().equals("test:/a1")) {
         return new FileStatus[] {
-            new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")),
+            new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")),
             new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) };
       } else if (f.toString().equals("test:/a1/a2")) {
         return new FileStatus[] {
@@ -116,5 +137,20 @@ public class TestFileInputFormat {
         throws FileNotFoundException, IOException {
       return this.listStatus(f);
     }
+
+    @Override
+    public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
+        throws IOException {
+      return new BlockLocation[] {
+          new BlockLocation(new String[] { "localhost:50010" },
+              new String[] { "localhost" }, 0, len) };
+    }
+
+    @Override
+    protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
+        PathFilter filter) throws FileNotFoundException, IOException {
+      ++numListLocatedStatusCalls;
+      return super.listLocatedStatus(f, filter);
+    }
   }
 }