You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:53:12 UTC

svn commit: r1077221 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/lib/input/ test/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 03:53:12 2011
New Revision: 1077221

URL: http://svn.apache.org/viewvc?rev=1077221&view=rev
Log:
commit 4f53b797d65822904fce5f8803009b59316810cc
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date:   Thu Feb 25 00:16:28 2010 +0530

    MAPREDUCE:1466 from https://issues.apache.org/jira/secure/attachment/12436886/MAPREDUCE-1466_yhadoop20-3.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1466. Added a private configuration variable
    +    mapreduce.input.num.files, to store number of input files
    +    being processed by M/R job. (Arun Murthy via yhemanth)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=1077221&r1=1077220&r2=1077221&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Fri Mar  4 03:53:12 2011
@@ -70,6 +70,9 @@ public abstract class FileInputFormat<K,
         return !name.startsWith("_") && !name.startsWith("."); 
       }
     }; 
+  
+  static final String NUM_INPUT_FILES = "mapreduce.input.num.files";
+  
   protected void setMinSplitSize(long minSplitSize) {
     this.minSplitSize = minSplitSize;
   }
@@ -204,6 +207,8 @@ public abstract class FileInputFormat<K,
     throws IOException {
     FileStatus[] files = listStatus(job);
     
+    // Save the number of input files in the job-conf
+    job.setLong(NUM_INPUT_FILES, files.length);
     long totalSize = 0;                           // compute total size
     for (FileStatus file: files) {                // check we have valid files
       if (file.isDir()) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=1077221&r1=1077220&r2=1077221&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Fri Mar  4 03:53:12 2011
@@ -63,6 +63,11 @@ public abstract class FileInputFormat<K,
     }; 
 
   /**
+   * {@link #NUM_INPUT_FILES} is not a public constant.
+   */
+  public static final String NUM_INPUT_FILES = "mapreduce.input.num.files";
+
+  /**
    * Proxy PathFilter that accepts a path only if all filters given in the
    * constructor do. Used by the listPaths() to apply the built-in
    * hiddenFileFilter together with a user provided one (if any).
@@ -242,7 +247,8 @@ public abstract class FileInputFormat<K,
 
     // generate splits
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    for (FileStatus file: listStatus(job)) {
+    List<FileStatus>files = listStatus(job);
+    for (FileStatus file: files) {
       Path path = file.getPath();
       FileSystem fs = path.getFileSystem(job.getConfiguration());
       long length = file.getLen();
@@ -270,6 +276,10 @@ public abstract class FileInputFormat<K,
         splits.add(new FileSplit(path, 0, length, new String[0]));
       }
     }
+    
+    // Save the number of input files in the job-conf
+    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
+
     LOG.debug("Total # of splits: " + splits.size());
     return splits;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=1077221&r1=1077220&r2=1077221&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java Fri Mar  4 03:53:12 2011
@@ -18,9 +18,11 @@
 package org.apache.hadoop.mapred;
 
 import java.io.DataOutputStream;
+import java.io.IOException;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,61 +31,100 @@ import org.apache.hadoop.hdfs.MiniDFSClu
 
 public class TestFileInputFormat extends TestCase {
 
+  Configuration conf = new Configuration();
+  MiniDFSCluster dfs = null;
+  
+  public void setUp() throws Exception {
+    dfs = new MiniDFSCluster(conf, 4, true,
+                             new String[]{"/rack0", "/rack0", 
+                                          "/rack1", "/rack1"},
+                             new String[]{"host0", "host1", 
+                                          "host2", "host3"});
+  }
+  
   public void testLocality() throws Exception {
-    JobConf conf = new JobConf();
-    MiniDFSCluster dfs = null;
-    try {
-      dfs = new MiniDFSCluster(conf, 4, true,
-                               new String[]{"/rack0", "/rack0", 
-                                             "/rack1", "/rack1"},
-                               new String[]{"host0", "host1", 
-                                            "host2", "host3"});
-      FileSystem fs = dfs.getFileSystem();
-      System.out.println("FileSystem " + fs.getUri());
-      Path path = new Path("/foo/bar");
-      // create a multi-block file on hdfs
-      DataOutputStream out = fs.create(path, true, 4096, 
-                                       (short) 2, 512, null);
-      for(int i=0; i < 1000; ++i) {
-        out.writeChars("Hello\n");
+    JobConf job = new JobConf(conf);
+    FileSystem fs = dfs.getFileSystem();
+    System.out.println("FileSystem " + fs.getUri());
+
+    Path inputDir = new Path("/foo/");
+    String fileName = "part-0000";
+    createInputs(fs, inputDir, fileName);
+
+    // split it using a file input format
+    TextInputFormat.addInputPath(job, inputDir);
+    TextInputFormat inFormat = new TextInputFormat();
+    inFormat.configure(job);
+    InputSplit[] splits = inFormat.getSplits(job, 1);
+    FileStatus fileStatus = fs.getFileStatus(new Path(inputDir, fileName));
+    BlockLocation[] locations = 
+      fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+    System.out.println("Made splits");
+
+    // make sure that each split is a block and the locations match
+    for(int i=0; i < splits.length; ++i) {
+      FileSplit fileSplit = (FileSplit) splits[i];
+      System.out.println("File split: " + fileSplit);
+      for (String h: fileSplit.getLocations()) {
+        System.out.println("Location: " + h);
       }
-      out.close();
-      System.out.println("Wrote file");
+      System.out.println("Block: " + locations[i]);
+      assertEquals(locations[i].getOffset(), fileSplit.getStart());
+      assertEquals(locations[i].getLength(), fileSplit.getLength());
+      String[] blockLocs = locations[i].getHosts();
+      String[] splitLocs = fileSplit.getLocations();
+      assertEquals(2, blockLocs.length);
+      assertEquals(2, splitLocs.length);
+      assertTrue((blockLocs[0].equals(splitLocs[0]) && 
+                  blockLocs[1].equals(splitLocs[1])) ||
+                 (blockLocs[1].equals(splitLocs[0]) &&
+                  blockLocs[0].equals(splitLocs[1])));
+    }
 
-      // split it using a file input format
-      TextInputFormat.addInputPath(conf, path);
-      TextInputFormat inFormat = new TextInputFormat();
-      inFormat.configure(conf);
-      InputSplit[] splits = inFormat.getSplits(conf, 1);
-      FileStatus fileStatus = fs.getFileStatus(path);
-      BlockLocation[] locations = 
-        fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-      System.out.println("Made splits");
-
-      // make sure that each split is a block and the locations match
-      for(int i=0; i < splits.length; ++i) {
-        FileSplit fileSplit = (FileSplit) splits[i];
-        System.out.println("File split: " + fileSplit);
-        for (String h: fileSplit.getLocations()) {
-          System.out.println("Location: " + h);
-        }
-        System.out.println("Block: " + locations[i]);
-        assertEquals(locations[i].getOffset(), fileSplit.getStart());
-        assertEquals(locations[i].getLength(), fileSplit.getLength());
-        String[] blockLocs = locations[i].getHosts();
-        String[] splitLocs = fileSplit.getLocations();
-        assertEquals(2, blockLocs.length);
-        assertEquals(2, splitLocs.length);
-        assertTrue((blockLocs[0].equals(splitLocs[0]) && 
-                    blockLocs[1].equals(splitLocs[1])) ||
-                   (blockLocs[1].equals(splitLocs[0]) &&
-                    blockLocs[0].equals(splitLocs[1])));
-      }
-    } finally {
-      if (dfs != null) {
-        dfs.shutdown();
-      }
+    assertEquals("Expected value of " + FileInputFormat.NUM_INPUT_FILES, 
+                 1, job.getLong(FileInputFormat.NUM_INPUT_FILES, 0));
+  }
+
+  private void createInputs(FileSystem fs, Path inDir, String fileName) 
+  throws IOException {
+    // create a multi-block file on hdfs
+    DataOutputStream out = fs.create(new Path(inDir, fileName), true, 4096, 
+                                     (short) 2, 512, null);
+    for(int i=0; i < 1000; ++i) {
+      out.writeChars("Hello\n");
     }
+    out.close();
+    System.out.println("Wrote file");
   }
+  
+  public void testNumInputs() throws Exception {
+    JobConf job = new JobConf(conf);
+    FileSystem fs = dfs.getFileSystem();
+    System.out.println("FileSystem " + fs.getUri());
+
+    Path inputDir = new Path("/foo/");
+    final int numFiles = 10;
+    String fileNameBase = "part-0000";
+    for (int i=0; i < numFiles; ++i) {
+      createInputs(fs, inputDir, fileNameBase + String.valueOf(i));  
+    }
+    createInputs(fs, inputDir, "_meta");
+    createInputs(fs, inputDir, "_temp");
+
+    // split it using a file input format
+    TextInputFormat.addInputPath(job, inputDir);
+    TextInputFormat inFormat = new TextInputFormat();
+    inFormat.configure(job);
+    InputSplit[] splits = inFormat.getSplits(job, 1);
+
+    assertEquals("Expected value of " + FileInputFormat.NUM_INPUT_FILES, 
+                 numFiles, job.getLong(FileInputFormat.NUM_INPUT_FILES, 0));
 
+  }
+  
+  public void tearDown() throws Exception {
+    if (dfs != null) {
+      dfs.shutdown();
+    }
+  }
 }