You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:53:12 UTC
svn commit: r1077221 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapreduce/lib/input/ test/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 03:53:12 2011
New Revision: 1077221
URL: http://svn.apache.org/viewvc?rev=1077221&view=rev
Log:
commit 4f53b797d65822904fce5f8803009b59316810cc
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date: Thu Feb 25 00:16:28 2010 +0530
MAPREDUCE:1466 from https://issues.apache.org/jira/secure/attachment/12436886/MAPREDUCE-1466_yhadoop20-3.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1466. Added a private configuration variable
+ mapreduce.input.num.files, to store number of input files
+ being processed by M/R job. (Arun Murthy via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=1077221&r1=1077220&r2=1077221&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Fri Mar 4 03:53:12 2011
@@ -70,6 +70,9 @@ public abstract class FileInputFormat<K,
return !name.startsWith("_") && !name.startsWith(".");
}
};
+
+ static final String NUM_INPUT_FILES = "mapreduce.input.num.files";
+
protected void setMinSplitSize(long minSplitSize) {
this.minSplitSize = minSplitSize;
}
@@ -204,6 +207,8 @@ public abstract class FileInputFormat<K,
throws IOException {
FileStatus[] files = listStatus(job);
+ // Save the number of input files in the job-conf
+ job.setLong(NUM_INPUT_FILES, files.length);
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDir()) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1077221&r1=1077220&r2=1077221&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Mar 4 03:53:12 2011
@@ -63,6 +63,11 @@ public abstract class FileInputFormat<K,
};
/**
+ * {@link #NUM_INPUT_FILES} is not a public constant.
+ */
+ public static final String NUM_INPUT_FILES = "mapreduce.input.num.files";
+
+ /**
* Proxy PathFilter that accepts a path only if all filters given in the
* constructor do. Used by the listPaths() to apply the built-in
* hiddenFileFilter together with a user provided one (if any).
@@ -242,7 +247,8 @@ public abstract class FileInputFormat<K,
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
- for (FileStatus file: listStatus(job)) {
+ List<FileStatus>files = listStatus(job);
+ for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
@@ -270,6 +276,10 @@ public abstract class FileInputFormat<K,
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
+
+ // Save the number of input files in the job-conf
+ job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
+
LOG.debug("Total # of splits: " + splits.size());
return splits;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1077221&r1=1077220&r2=1077221&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java Fri Mar 4 03:53:12 2011
@@ -18,9 +18,11 @@
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
+import java.io.IOException;
import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -29,61 +31,100 @@ import org.apache.hadoop.hdfs.MiniDFSClu
public class TestFileInputFormat extends TestCase {
+ Configuration conf = new Configuration();
+ MiniDFSCluster dfs = null;
+
+ public void setUp() throws Exception {
+ dfs = new MiniDFSCluster(conf, 4, true,
+ new String[]{"/rack0", "/rack0",
+ "/rack1", "/rack1"},
+ new String[]{"host0", "host1",
+ "host2", "host3"});
+ }
+
public void testLocality() throws Exception {
- JobConf conf = new JobConf();
- MiniDFSCluster dfs = null;
- try {
- dfs = new MiniDFSCluster(conf, 4, true,
- new String[]{"/rack0", "/rack0",
- "/rack1", "/rack1"},
- new String[]{"host0", "host1",
- "host2", "host3"});
- FileSystem fs = dfs.getFileSystem();
- System.out.println("FileSystem " + fs.getUri());
- Path path = new Path("/foo/bar");
- // create a multi-block file on hdfs
- DataOutputStream out = fs.create(path, true, 4096,
- (short) 2, 512, null);
- for(int i=0; i < 1000; ++i) {
- out.writeChars("Hello\n");
+ JobConf job = new JobConf(conf);
+ FileSystem fs = dfs.getFileSystem();
+ System.out.println("FileSystem " + fs.getUri());
+
+ Path inputDir = new Path("/foo/");
+ String fileName = "part-0000";
+ createInputs(fs, inputDir, fileName);
+
+ // split it using a file input format
+ TextInputFormat.addInputPath(job, inputDir);
+ TextInputFormat inFormat = new TextInputFormat();
+ inFormat.configure(job);
+ InputSplit[] splits = inFormat.getSplits(job, 1);
+ FileStatus fileStatus = fs.getFileStatus(new Path(inputDir, fileName));
+ BlockLocation[] locations =
+ fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+ System.out.println("Made splits");
+
+ // make sure that each split is a block and the locations match
+ for(int i=0; i < splits.length; ++i) {
+ FileSplit fileSplit = (FileSplit) splits[i];
+ System.out.println("File split: " + fileSplit);
+ for (String h: fileSplit.getLocations()) {
+ System.out.println("Location: " + h);
}
- out.close();
- System.out.println("Wrote file");
+ System.out.println("Block: " + locations[i]);
+ assertEquals(locations[i].getOffset(), fileSplit.getStart());
+ assertEquals(locations[i].getLength(), fileSplit.getLength());
+ String[] blockLocs = locations[i].getHosts();
+ String[] splitLocs = fileSplit.getLocations();
+ assertEquals(2, blockLocs.length);
+ assertEquals(2, splitLocs.length);
+ assertTrue((blockLocs[0].equals(splitLocs[0]) &&
+ blockLocs[1].equals(splitLocs[1])) ||
+ (blockLocs[1].equals(splitLocs[0]) &&
+ blockLocs[0].equals(splitLocs[1])));
+ }
- // split it using a file input format
- TextInputFormat.addInputPath(conf, path);
- TextInputFormat inFormat = new TextInputFormat();
- inFormat.configure(conf);
- InputSplit[] splits = inFormat.getSplits(conf, 1);
- FileStatus fileStatus = fs.getFileStatus(path);
- BlockLocation[] locations =
- fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
- System.out.println("Made splits");
-
- // make sure that each split is a block and the locations match
- for(int i=0; i < splits.length; ++i) {
- FileSplit fileSplit = (FileSplit) splits[i];
- System.out.println("File split: " + fileSplit);
- for (String h: fileSplit.getLocations()) {
- System.out.println("Location: " + h);
- }
- System.out.println("Block: " + locations[i]);
- assertEquals(locations[i].getOffset(), fileSplit.getStart());
- assertEquals(locations[i].getLength(), fileSplit.getLength());
- String[] blockLocs = locations[i].getHosts();
- String[] splitLocs = fileSplit.getLocations();
- assertEquals(2, blockLocs.length);
- assertEquals(2, splitLocs.length);
- assertTrue((blockLocs[0].equals(splitLocs[0]) &&
- blockLocs[1].equals(splitLocs[1])) ||
- (blockLocs[1].equals(splitLocs[0]) &&
- blockLocs[0].equals(splitLocs[1])));
- }
- } finally {
- if (dfs != null) {
- dfs.shutdown();
- }
+ assertEquals("Expected value of " + FileInputFormat.NUM_INPUT_FILES,
+ 1, job.getLong(FileInputFormat.NUM_INPUT_FILES, 0));
+ }
+
+ private void createInputs(FileSystem fs, Path inDir, String fileName)
+ throws IOException {
+ // create a multi-block file on hdfs
+ DataOutputStream out = fs.create(new Path(inDir, fileName), true, 4096,
+ (short) 2, 512, null);
+ for(int i=0; i < 1000; ++i) {
+ out.writeChars("Hello\n");
}
+ out.close();
+ System.out.println("Wrote file");
}
+
+ public void testNumInputs() throws Exception {
+ JobConf job = new JobConf(conf);
+ FileSystem fs = dfs.getFileSystem();
+ System.out.println("FileSystem " + fs.getUri());
+
+ Path inputDir = new Path("/foo/");
+ final int numFiles = 10;
+ String fileNameBase = "part-0000";
+ for (int i=0; i < numFiles; ++i) {
+ createInputs(fs, inputDir, fileNameBase + String.valueOf(i));
+ }
+ createInputs(fs, inputDir, "_meta");
+ createInputs(fs, inputDir, "_temp");
+
+ // split it using a file input format
+ TextInputFormat.addInputPath(job, inputDir);
+ TextInputFormat inFormat = new TextInputFormat();
+ inFormat.configure(job);
+ InputSplit[] splits = inFormat.getSplits(job, 1);
+
+ assertEquals("Expected value of " + FileInputFormat.NUM_INPUT_FILES,
+ numFiles, job.getLong(FileInputFormat.NUM_INPUT_FILES, 0));
+ }
+
+ public void tearDown() throws Exception {
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
}