You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/07/26 20:22:26 UTC
svn commit: r1507388 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/o...
Author: jlowe
Date: Fri Jul 26 18:22:26 2013
New Revision: 1507388
URL: http://svn.apache.org/r1507388
Log:
MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus. Contributed by Hairong Kuang and Jason Lowe
Added:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (with props)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jul 26 18:22:26 2013
@@ -10,6 +10,9 @@ Release 0.23.10 - UNRELEASED
OPTIMIZATIONS
+ MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus
+ (Hairong Kuang and Jason Lowe via jlowe)
+
BUG FIXES
MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -36,8 +36,10 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -164,13 +166,17 @@ public abstract class FileInputFormat<K,
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
- for(FileStatus stat: fs.listStatus(path, inputFilter)) {
- if (stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
}
- }
+ }
}
/** List input directories.
@@ -216,14 +222,19 @@ public abstract class FileInputFormat<K,
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
- for(FileStatus stat: fs.listStatus(globStat.getPath(),
- inputFilter)) {
- if (recursive && stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
+ RemoteIterator<LocatedFileStatus> iter =
+ fs.listLocatedStatus(globStat.getPath());
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (recursive && stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(),
+ inputFilter);
+ } else {
+ result.add(stat);
+ }
}
- }
+ }
} else {
result.add(globStat);
}
@@ -249,7 +260,6 @@ public abstract class FileInputFormat<K,
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
- @SuppressWarnings("deprecation")
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job);
@@ -273,31 +283,38 @@ public abstract class FileInputFormat<K,
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job);
long length = file.getLen();
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(fs, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
-
- long bytesRemaining = length;
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- String[] splitHosts = getSplitHosts(blkLocations,
- length-bytesRemaining, splitSize, clusterMap);
- splits.add(makeSplit(path, length-bytesRemaining, splitSize,
- splitHosts));
- bytesRemaining -= splitSize;
+ if (length != 0) {
+ FileSystem fs = path.getFileSystem(job);
+ BlockLocation[] blkLocations;
+ if (file instanceof LocatedFileStatus) {
+ blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+ } else {
+ blkLocations = fs.getFileBlockLocations(file, 0, length);
+ }
+ if (isSplitable(fs, path)) {
+ long blockSize = file.getBlockSize();
+ long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+ long bytesRemaining = length;
+ while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+ String[] splitHosts = getSplitHosts(blkLocations,
+ length-bytesRemaining, splitSize, clusterMap);
+ splits.add(makeSplit(path, length-bytesRemaining, splitSize,
+ splitHosts));
+ bytesRemaining -= splitSize;
+ }
+
+ if (bytesRemaining != 0) {
+ String[] splitHosts = getSplitHosts(blkLocations, length
+ - bytesRemaining, bytesRemaining, clusterMap);
+ splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
+ splitHosts));
+ }
+ } else {
+ String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
+ splits.add(makeSplit(path, 0, length, splitHosts));
}
-
- if (bytesRemaining != 0) {
- String[] splitHosts = getSplitHosts(blkLocations, length
- - bytesRemaining, bytesRemaining, clusterMap);
- splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
- splitHosts));
- }
- } else if (length != 0) {
- String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
- splits.add(makeSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.lib.
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
import java.util.HashSet;
import java.util.List;
import java.util.HashMap;
@@ -33,7 +32,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
@@ -203,47 +202,33 @@ public abstract class CombineFileInputFo
}
// all the files in input set
- Path[] paths = FileUtil.stat2Paths(
- listStatus(job).toArray(new FileStatus[0]));
+ List<FileStatus> stats = listStatus(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
- if (paths.length == 0) {
+ if (stats.size() == 0) {
return splits;
}
- // Convert them to Paths first. This is a costly operation and
- // we should do it first, otherwise we will incur doing it multiple
- // times, one time each for each pool in the next loop.
- List<Path> newpaths = new LinkedList<Path>();
- for (int i = 0; i < paths.length; i++) {
- FileSystem fs = paths[i].getFileSystem(conf);
- Path p = fs.makeQualified(paths[i]);
- newpaths.add(p);
- }
- paths = null;
-
// In one single iteration, process all the paths in a single pool.
// Processing one pool at a time ensures that a split contains paths
// from a single pool only.
for (MultiPathFilter onepool : pools) {
- ArrayList<Path> myPaths = new ArrayList<Path>();
+ ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
// pick one input path. If it matches all the filters in a pool,
// add it to the output set
- for (Iterator<Path> iter = newpaths.iterator(); iter.hasNext();) {
- Path p = iter.next();
- if (onepool.accept(p)) {
+ for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
+ FileStatus p = iter.next();
+ if (onepool.accept(p.getPath())) {
myPaths.add(p); // add it to my output set
iter.remove();
}
}
// create splits for all files in this pool.
- getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
- maxSize, minSizeNode, minSizeRack, splits);
+ getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
}
// create splits for all files that are not in any pool.
- getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]),
- maxSize, minSizeNode, minSizeRack, splits);
+ getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
// free up rackToNodes map
rackToNodes.clear();
@@ -253,7 +238,7 @@ public abstract class CombineFileInputFo
/**
* Return all the splits in the specified set of paths
*/
- private void getMoreSplits(JobContext job, Path[] paths,
+ private void getMoreSplits(JobContext job, List<FileStatus> stats,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
@@ -274,18 +259,20 @@ public abstract class CombineFileInputFo
HashMap<String, List<OneBlockInfo>> nodeToBlocks =
new HashMap<String, List<OneBlockInfo>>();
- files = new OneFileInfo[paths.length];
- if (paths.length == 0) {
+ files = new OneFileInfo[stats.size()];
+ if (stats.size() == 0) {
return;
}
// populate all the blocks for all files
long totLength = 0;
- for (int i = 0; i < paths.length; i++) {
- files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
+ int fileIdx = 0;
+ for (FileStatus stat : stats) {
+ files[fileIdx] = new OneFileInfo(stat, conf,
+ isSplitable(job, stat.getPath()),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize);
- totLength += files[i].getLength();
+ totLength += files[fileIdx].getLength();
}
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
@@ -479,7 +466,7 @@ public abstract class CombineFileInputFo
private long fileSize; // size of the file
private OneBlockInfo[] blocks; // all blocks in this file
- OneFileInfo(Path path, Configuration conf,
+ OneFileInfo(FileStatus stat, Configuration conf,
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
@@ -490,10 +477,13 @@ public abstract class CombineFileInputFo
this.fileSize = 0;
// get block locations from file system
- FileSystem fs = path.getFileSystem(conf);
- FileStatus stat = fs.getFileStatus(path);
- BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
- stat.getLen());
+ BlockLocation[] locations;
+ if (stat instanceof LocatedFileStatus) {
+ locations = ((LocatedFileStatus) stat).getBlockLocations();
+ } else {
+ FileSystem fs = stat.getPath().getFileSystem(conf);
+ locations = fs.getFileBlockLocations(stat, 0, stat.getLen());
+ }
// create a list of all block and their locations
if (locations == null) {
blocks = new OneBlockInfo[0];
@@ -508,8 +498,8 @@ public abstract class CombineFileInputFo
// full file length
blocks = new OneBlockInfo[1];
fileSize = stat.getLen();
- blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
- .getHosts(), locations[0].getTopologyPaths());
+ blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize,
+ locations[0].getHosts(), locations[0].getTopologyPaths());
} else {
ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
locations.length);
@@ -535,9 +525,9 @@ public abstract class CombineFileInputFo
myLength = Math.min(maxSize, left);
}
}
- OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
- myLength, locations[i].getHosts(), locations[i]
- .getTopologyPaths());
+ OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(),
+ myOffset, myLength, locations[i].getHosts(),
+ locations[i].getTopologyPaths());
left -= myLength;
myOffset += myLength;
@@ -638,6 +628,9 @@ public abstract class CombineFileInputFo
protected BlockLocation[] getFileBlockLocations(
FileSystem fs, FileStatus stat) throws IOException {
+ if (stat instanceof LocatedFileStatus) {
+ return ((LocatedFileStatus) stat).getBlockLocations();
+ }
return fs.getFileBlockLocations(stat, 0, stat.getLen());
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -29,9 +29,11 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -254,14 +256,19 @@ public abstract class FileInputFormat<K,
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
- for(FileStatus stat: fs.listStatus(globStat.getPath(),
- inputFilter)) {
- if (recursive && stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
+ RemoteIterator<LocatedFileStatus> iter =
+ fs.listLocatedStatus(globStat.getPath());
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (recursive && stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(),
+ inputFilter);
+ } else {
+ result.add(stat);
+ }
}
- }
+ }
} else {
result.add(globStat);
}
@@ -291,13 +298,17 @@ public abstract class FileInputFormat<K,
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
- for(FileStatus stat: fs.listStatus(path, inputFilter)) {
- if (stat.isDirectory()) {
- addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
- } else {
- result.add(stat);
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(path);
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
}
- }
+ }
}
@@ -326,8 +337,13 @@ public abstract class FileInputFormat<K,
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+ BlockLocation[] blkLocations;
+ if (file instanceof LocatedFileStatus) {
+ blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+ } else {
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
+ blkLocations = fs.getFileBlockLocations(file, 0, length);
+ }
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1507388&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFileInputFormat {
+
+ @Test
+ public void testListLocatedStatus() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setBoolean("fs.test.impl.disable.cache", false);
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+ "test:///a1/a2");
+ MockFileSystem mockFs =
+ (MockFileSystem) new Path("test:///").getFileSystem(conf);
+ Assert.assertEquals("listLocatedStatus already called",
+ 0, mockFs.numListLocatedStatusCalls);
+ JobConf job = new JobConf(conf);
+ TextInputFormat fileInputFormat = new TextInputFormat();
+ fileInputFormat.configure(job);
+ InputSplit[] splits = fileInputFormat.getSplits(job, 1);
+ Assert.assertEquals("Input splits are not correct", 2, splits.length);
+ Assert.assertEquals("listLocatedStatuss calls",
+ 1, mockFs.numListLocatedStatusCalls);
+ }
+
+ private Configuration getConfiguration() {
+ Configuration conf = new Configuration();
+ conf.set("fs.test.impl.disable.cache", "true");
+ conf.setClass("fs.test.impl", MockFileSystem.class, FileSystem.class);
+ conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
+ "test:///a1");
+ return conf;
+ }
+
+ static class MockFileSystem extends RawLocalFileSystem {
+ int numListLocatedStatusCalls = 0;
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+ IOException {
+ if (f.toString().equals("test:/a1")) {
+ return new FileStatus[] {
+ new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")),
+ new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) };
+ } else if (f.toString().equals("test:/a1/a2")) {
+ return new FileStatus[] {
+ new FileStatus(10, false, 1, 150, 150,
+ new Path("test:/a1/a2/file2")),
+ new FileStatus(10, false, 1, 151, 150,
+ new Path("test:/a1/a2/file3")) };
+ }
+ return new FileStatus[0];
+ }
+
+ @Override
+ public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+ throws IOException {
+ return new FileStatus[] { new FileStatus(10, true, 1, 150, 150,
+ pathPattern) };
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f, PathFilter filter)
+ throws FileNotFoundException, IOException {
+ return this.listStatus(f);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
+ throws IOException {
+ return new BlockLocation[] {
+ new BlockLocation(new String[] { "localhost:50010" },
+ new String[] { "localhost" }, 0, len) };
+ }
+
+ @Override
+ protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
+ PathFilter filter) throws FileNotFoundException, IOException {
+ ++numListLocatedStatusCalls;
+ return super.listLocatedStatus(f, filter);
+ }
+ }
+}
Propchange: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=1507388&r1=1507387&r2=1507388&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java Fri Jul 26 18:22:26 2013
@@ -24,11 +24,14 @@ import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Test;
@@ -77,6 +80,23 @@ public class TestFileInputFormat {
.toString());
}
+ @Test
+ public void testListLocatedStatus() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setBoolean("fs.test.impl.disable.cache", false);
+ conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2");
+ MockFileSystem mockFs =
+ (MockFileSystem) new Path("test:///").getFileSystem(conf);
+ Assert.assertEquals("listLocatedStatus already called",
+ 0, mockFs.numListLocatedStatusCalls);
+ Job job = Job.getInstance(conf);
+ FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+ List<InputSplit> splits = fileInputFormat.getSplits(job);
+ Assert.assertEquals("Input splits are not correct", 2, splits.size());
+ Assert.assertEquals("listLocatedStatuss calls",
+ 1, mockFs.numListLocatedStatusCalls);
+ }
+
private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set("fs.test.impl.disable.cache", "true");
@@ -86,13 +106,14 @@ public class TestFileInputFormat {
}
static class MockFileSystem extends RawLocalFileSystem {
+ int numListLocatedStatusCalls = 0;
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
if (f.toString().equals("test:/a1")) {
return new FileStatus[] {
- new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")),
+ new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")),
new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) };
} else if (f.toString().equals("test:/a1/a2")) {
return new FileStatus[] {
@@ -116,5 +137,20 @@ public class TestFileInputFormat {
throws FileNotFoundException, IOException {
return this.listStatus(f);
}
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path p, long start, long len)
+ throws IOException {
+ return new BlockLocation[] {
+ new BlockLocation(new String[] { "localhost:50010" },
+ new String[] { "localhost" }, 0, len) };
+ }
+
+ @Override
+ protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
+ PathFilter filter) throws FileNotFoundException, IOException {
+ ++numListLocatedStatusCalls;
+ return super.listLocatedStatus(f, filter);
+ }
}
}