You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 07:02:06 UTC

svn commit: r1079266 - in /hadoop/mapreduce/branches/yahoo-merge/src: docs/src/documentation/content/xdocs/rumen.xml test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java tools/org/apache/hadoop/tools/rumen/TraceBuilder.java

Author: omalley
Date: Tue Mar  8 06:02:06 2011
New Revision: 1079266

URL: http://svn.apache.org/viewvc?rev=1079266&view=rev
Log:
commit 963bb3b8e8a68deef5b8a0ee8985a7c26d801cd8
Author: Ravi Gummadi <gr...@yahoo-inc.com>
Date:   Fri Feb 18 15:29:34 2011 -0800

     [MR-1978] :Rumen: Recursive scanning of input directories by
    TraceBuilder. Patch is available at
     (gravi)
    
    +++ b/YAHOO-CHANGES.txt
    +   [MR-1978] :Rumen: Recursive scanning of input directories by
    +  TraceBuilder. Patch is available at
    +   (gravi)
    +

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml
    hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml?rev=1079266&r1=1079265&r2=1079266&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml Tue Mar  8 06:02:06 2011
@@ -153,15 +153,15 @@
       <title>Trace Builder</title>
       
       <p><code>Command:</code></p>
-      <source>java org.apache.hadoop.tools.rumen.TraceBuilder [options] [jobtrace-output] [topology-output] [input]</source>
-      
+      <source>java org.apache.hadoop.tools.rumen.TraceBuilder [options] &lt;jobtrace-output&gt; &lt;topology-output&gt; &lt;inputs&gt;</source>
+
       <p>This command invokes the <code>TraceBuilder</code> utility of 
          <em>Rumen</em>. It converts the JobHistory files into a series of JSON 
-         objects and output them in the <code>[jobtrace-output]</code> file. 
-         It also extracts the cluster layout (topology) and outputs it in the
-         <code>[topology-output]</code> file. 
-         <code>[input]</code> represents a space separated list of JobHistory 
-         files and folders.
+         objects and writes them into the <code>&lt;jobtrace-output&gt;</code>
+         file. It also extracts the cluster layout (topology) and writes it in
+         the<code>&lt;topology-output&gt;</code> file.
+         <code>&lt;inputs&gt;</code> represents a space-separated list of
+         JobHistory files and folders.
       </p>
          
          <note>1) Input and output to <code>TraceBuilder</code> is expected to
@@ -173,9 +173,12 @@
                regular expressions.
          </note>
          <note>
-               2) TraceBuilder does not recursively scan the input folder for
-               job history files. Only the files that are directly placed under 
-               the input folder will be considered for generating the trace.
+               2) By default, TraceBuilder does not recursively scan the input
+               folder for job history files. Only the files that are directly
+               placed under the input folder will be considered for generating
+               the trace. To add all the files under the input directory by
+               recursively scanning the input directory, use ‘-recursive’
+               option.
          </note>
       
       <p>Cluster topology is used as follows :</p>
@@ -208,6 +211,15 @@
               from the source files.
           </td>
         </tr>
+        <tr>
+          <td><code>-recursive</code></td>
+          <td>Recursively traverse input paths for job history logs.</td>
+          <td>This option should be used to inform the TraceBuilder to
+          recursively scan the input paths and process all the files under it.
+          Note that, by default, only the history logs that are directly under
+          the input folder are considered for generating the trace.
+          </td>
+        </tr>
       </table>
       
       <section>

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1079266&r1=1079265&r2=1079266&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Mar  8 06:02:06 2011
@@ -345,61 +345,6 @@ public class TestRumenJobTraces {
   }
 
   /**
-   * Test if {@link TraceBuilder} can process globbed input file paths.
-   */
-  @Test
-  public void testGlobbedInput() throws Exception {
-    final Configuration conf = new Configuration();
-    final FileSystem lfs = FileSystem.getLocal(conf);
-    
-    // define the test's root temporary directory
-    final Path rootTempDir =
-      new Path(System.getProperty("test.build.data", "/tmp"))
-            .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
-    // define the test's root input directory
-    Path testRootInputDir = new Path(rootTempDir, "TestGlobbedInputPath");
-    // define the nested input directory
-    Path nestedInputDir = new Path(testRootInputDir, "1/2/3/4");
-    // define the globbed version of the nested input directory
-    Path globbedInputNestedDir = 
-      lfs.makeQualified(new Path(testRootInputDir, "*/*/*/*/*"));
-    
-    // define a file in the nested test input directory
-    Path inputPath1 = new Path(nestedInputDir, "test.txt");
-    // define a sub-folder in the nested test input directory
-    Path inputPath2Parent = new Path(nestedInputDir, "test");
-    lfs.mkdirs(inputPath2Parent);
-    // define a file in the sub-folder within the nested test input directory
-    Path inputPath2 = new Path(inputPath2Parent, "test.txt");
-    
-    // create empty input files
-    lfs.createNewFile(inputPath1);
-    lfs.createNewFile(inputPath2);
-    
-    // define the output trace and topology files
-    Path outputTracePath = new Path(testRootInputDir, "test.json");
-    Path outputTopologyTracePath = new Path(testRootInputDir, "topology.json");
-    
-    String[] args = 
-      new String[] {outputTracePath.toString(), 
-                    outputTopologyTracePath.toString(), 
-                    globbedInputNestedDir.toString() };
-    
-    // invoke TraceBuilder's MyOptions command options parsing module/utility
-    MyOptions options = new TraceBuilder.MyOptions(args, conf);
-    
-    lfs.delete(testRootInputDir, true);
-    
-    assertEquals("Error in detecting globbed input FileSystem paths", 
-                 2, options.inputs.size());
-    
-    assertTrue("Missing input file " + inputPath1, 
-               options.inputs.contains(inputPath1));
-    assertTrue("Missing input file " + inputPath2, 
-               options.inputs.contains(inputPath2));
-  }
-
-  /**
    * Test if {@link CurrentJHParser} can read events from current JH files.
    */
   @Test
@@ -519,6 +464,163 @@ public class TestRumenJobTraces {
     validateJobConfParser("sample-conf.file.new.xml", true);
   }
 
+  /**
+   * Check if processing of input arguments is as expected by passing globbed
+   * input path
+   * <li> without -recursive option and
+   * <li> with -recursive option.
+   */
+  @Test
+  public void testProcessInputArgument() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    // define the test's root temporary directory
+    final Path rootTempDir =
+      new Path(System.getProperty("test.build.data", "/tmp"))
+        .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+    // define the test's root input directory
+    Path testRootInputDir = new Path(rootTempDir, "TestProcessInputArgument");
+    // define the nested input directory
+    Path nestedInputDir = new Path(testRootInputDir, "1/2/3/4");
+    // define the globbed version of the nested input directory
+    Path globbedInputNestedDir =
+      lfs.makeQualified(new Path(testRootInputDir, "*/*/*/*/*"));
+    try {
+      lfs.delete(nestedInputDir, true);
+
+      List<String> recursiveInputPaths = new ArrayList<String>();
+      List<String> nonRecursiveInputPaths = new ArrayList<String>();
+      // Create input files under the given path with multiple levels of
+      // sub directories
+      createHistoryLogsHierarchy(nestedInputDir, lfs, recursiveInputPaths,
+          nonRecursiveInputPaths);
+
+      // Check the case of globbed input path and without -recursive option
+      List<Path> inputs = MyOptions.processInputArgument(
+          globbedInputNestedDir.toString(), conf, false);
+      validateHistoryLogPaths(inputs, nonRecursiveInputPaths);
+
+      // Check the case of globbed input path and with -recursive option
+      inputs = MyOptions.processInputArgument(
+          globbedInputNestedDir.toString(), conf, true);
+      validateHistoryLogPaths(inputs, recursiveInputPaths);
+
+    } finally {
+      lfs.delete(testRootInputDir, true);
+    }
+  }
+
+  /**
+   * Validate if the input history log paths are as expected.
+   * @param inputs  the resultant input paths to be validated
+   * @param expectedHistoryFileNames  the expected input history logs
+   * @throws IOException
+   */
+  private void validateHistoryLogPaths(List<Path> inputs,
+      List<String> expectedHistoryFileNames) throws IOException {
+
+    System.out.println("\nExpected history files are:");
+    for (String historyFile : expectedHistoryFileNames) {
+      System.out.println(historyFile);
+    }
+
+    System.out.println("\nResultant history files are:");
+    List<String> historyLogs = new ArrayList<String>();
+    for (Path p : inputs) {
+      historyLogs.add(p.toUri().getPath());
+      System.out.println(p.toUri().getPath());
+    }
+
+    assertEquals("Number of history logs found is different from the expected.",
+        expectedHistoryFileNames.size(), inputs.size());
+
+    // Verify if all the history logs are expected ones and they are in the
+    // expected order
+    assertTrue("Some of the history log files do not match the expected.",
+        historyLogs.equals(expectedHistoryFileNames));
+  }
+
+  /**
+   * Create history logs under the given path with multiple levels of
+   * sub directories as shown below.
+   * <br>
+   * Create a file, an empty subdirectory and a nonempty subdirectory
+   * &lt;historyDir&gt; under the given input path.
+   * <br>
+   * The subdirectory &lt;historyDir&gt; contains the following dir structure:
+   * <br>
+   * <br>&lt;historyDir&gt;/historyFile1.txt
+   * <br>&lt;historyDir&gt;/historyFile1.gz
+   * <br>&lt;historyDir&gt;/subDir1/historyFile2.txt
+   * <br>&lt;historyDir&gt;/subDir1/historyFile2.gz
+   * <br>&lt;historyDir&gt;/subDir2/historyFile3.txt
+   * <br>&lt;historyDir&gt;/subDir2/historyFile3.gz
+   * <br>&lt;historyDir&gt;/subDir1/subDir11/historyFile4.txt
+   * <br>&lt;historyDir&gt;/subDir1/subDir11/historyFile4.gz
+   * <br>&lt;historyDir&gt;/subDir2/subDir21/
+   * <br>
+   * Create the lists of input paths that should be processed by TraceBuilder
+   * for recursive case and non-recursive case.
+   * @param nestedInputDir the input history logs directory where history files
+   *                       with nested subdirectories are created
+   * @param fs         FileSystem of the input paths
+   * @param recursiveInputPaths input paths for recursive case
+   * @param nonRecursiveInputPaths input paths for non-recursive case
+   * @throws IOException
+   */
+  private void createHistoryLogsHierarchy(Path nestedInputDir, FileSystem fs,
+      List<String> recursiveInputPaths, List<String> nonRecursiveInputPaths)
+      throws IOException {
+    List<Path> dirs = new ArrayList<Path>();
+    // define a file in the nested test input directory
+    Path inputPath1 = new Path(nestedInputDir, "historyFile.txt");
+    // define an empty sub-folder in the nested test input directory
+    Path emptyDir = new Path(nestedInputDir, "emptyDir");
+    // define a nonempty sub-folder in the nested test input directory
+    Path historyDir = new Path(nestedInputDir, "historyDir");
+
+    fs.mkdirs(nestedInputDir);
+    // Create an empty input file
+    fs.createNewFile(inputPath1);
+    // Create empty subdir
+    fs.mkdirs(emptyDir);// let us not create any files under this dir
+
+    fs.mkdirs(historyDir);
+    dirs.add(historyDir);
+
+    Path subDir1 = new Path(historyDir, "subDir1");
+    fs.mkdirs(subDir1);
+    dirs.add(subDir1);
+    Path subDir2 = new Path(historyDir, "subDir2");
+    fs.mkdirs(subDir2);
+    dirs.add(subDir2);
+
+    Path subDir11 = new Path(subDir1, "subDir11");
+    fs.mkdirs(subDir11);
+    dirs.add(subDir11);
+    Path subDir21 = new Path(subDir2, "subDir21");
+    fs.mkdirs(subDir21);// let us not create any files under this dir
+
+    int i = 0;
+    for (Path dir : dirs) {
+      i++;
+      Path gzPath = new Path(dir, "historyFile" + i + ".gz");
+      Path txtPath = new Path(dir, "historyFile" + i + ".txt");
+      fs.createNewFile(txtPath);
+      fs.createNewFile(gzPath);
+      recursiveInputPaths.add(gzPath.toUri().getPath());
+      recursiveInputPaths.add(txtPath.toUri().getPath());
+      if (i == 1) {
+        nonRecursiveInputPaths.add(gzPath.toUri().getPath());
+        nonRecursiveInputPaths.add(txtPath.toUri().getPath());
+      }
+    }
+    recursiveInputPaths.add(inputPath1.toUri().getPath());
+    nonRecursiveInputPaths.add(inputPath1.toUri().getPath());
+  }
+
+
   private void validateJobConfParser(String confFile, boolean newConfig)
       throws Exception {
     final Configuration conf = new Configuration();

Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=1079266&r1=1079265&r2=1079266&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Tue Mar  8 06:02:06 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
@@ -34,7 +35,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
@@ -67,69 +70,110 @@ public class TraceBuilder extends Config
         IOException, ClassNotFoundException {
       int switchTop = 0;
 
+      // to determine if the input paths should be recursively scanned or not
+      boolean doRecursiveTraversal = false;
+
       while (args[switchTop].startsWith("-")) {
         if (args[switchTop].equalsIgnoreCase("-demuxer")) {
           inputDemuxerClass =
               Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
-
-          ++switchTop;
+        } else if (args[switchTop].equalsIgnoreCase("-recursive")) {
+          doRecursiveTraversal = true;
         }
+        ++switchTop;
       }
 
       traceOutput = new Path(args[0 + switchTop]);
       topologyOutput = new Path(args[1 + switchTop]);
 
       for (int i = 2 + switchTop; i < args.length; ++i) {
-        processInputArguments(args[i], conf);
+        inputs.addAll(processInputArgument(
+            args[i], conf, doRecursiveTraversal));
+      }
+    }
+
+    /**
+     * Compare the history file names, not the full paths.
+     * Job history file name format is such that doing lexicographic sort on the
+     * history file names should result in the order of jobs' submission times.
+     */
+    private static class HistoryLogsComparator
+        implements Comparator<FileStatus> {
+      @Override
+      public int compare(FileStatus file1, FileStatus file2) {
+        return file1.getPath().getName().compareTo(
+            file2.getPath().getName());
       }
     }
-    
-    /** Processes the input file/folder arguments. If the input is a file then 
-     *  it is directly considered for further processing. If the input is a 
-     *  folder, then all the files in the input folder are considered for 
-     *  further processing.
+
+    /**
+     * Processes the input file/folder argument. If the input is a file,
+     * then it is directly considered for further processing by TraceBuilder.
+     * If the input is a folder, then all the history logs in the
+     * input folder are considered for further processing.
+     *
+     * If isRecursive is true, then the input path is recursively scanned
+     * for job history logs for further processing by TraceBuilder.
      *
-     *  NOTE: If the input represents a globbed path, then it is first flattened
-     *        and then the individual paths represented by the globbed input
-     *        path are processed.
+     * NOTE: If the input represents a globbed path, then it is first flattened
+     *       and then the individual paths represented by the globbed input
+     *       path are considered for further processing.
+     *
+     * @param input        input path, possibly globbed
+     * @param conf         configuration
+     * @param isRecursive  whether to recursively traverse the input paths to
+     *                     find history logs
+     * @return the input history log files' paths
+     * @throws FileNotFoundException
+     * @throws IOException
      */
-    private void processInputArguments(String input, Configuration conf) 
-    throws IOException {
+    static List<Path> processInputArgument(String input, Configuration conf,
+        boolean isRecursive) throws FileNotFoundException, IOException {
       Path inPath = new Path(input);
       FileSystem fs = inPath.getFileSystem(conf);
       FileStatus[] inStatuses = fs.globStatus(inPath);
-      
+
+      List<Path> inputPaths = new LinkedList<Path>();
       if (inStatuses == null || inStatuses.length == 0) {
-        return;
+        return inputPaths;
       }
-      
+
       for (FileStatus inStatus : inStatuses) {
         Path thisPath = inStatus.getPath();
         if (inStatus.isDirectory()) {
-          FileStatus[] statuses = fs.listStatus(thisPath);
-
-          List<String> dirNames = new ArrayList<String>();
 
-          for (FileStatus s : statuses) {
-            if (s.isDirectory()) continue;
-            String name = s.getPath().getName();
+          // Find list of files in this path(recursively if -recursive option
+          // is specified).
+          List<FileStatus> historyLogs = new ArrayList<FileStatus>();
+
+          RemoteIterator<LocatedFileStatus> iter =
+            fs.listFiles(thisPath, isRecursive);
+          while (iter.hasNext()) {
+            LocatedFileStatus child = iter.next();
+            String fileName = child.getPath().getName();
 
-            if (!(name.endsWith(".crc") || name.startsWith("."))) {
-              dirNames.add(name);
+            if (!(fileName.endsWith(".crc") || fileName.startsWith("."))) {
+              historyLogs.add(child);
             }
           }
 
-          String[] sortableNames = dirNames.toArray(new String[1]);
+          if (historyLogs.size() > 0) {
+            // Add the sorted history log file names in this path to the
+            // inputPaths list
+            FileStatus[] sortableNames = historyLogs.toArray(
+                new FileStatus[historyLogs.size()]);
+            Arrays.sort(sortableNames, new HistoryLogsComparator());
 
-          Arrays.sort(sortableNames);
-
-          for (String dirName : sortableNames) {
-            inputs.add(new Path(thisPath, dirName));
+            for (FileStatus historyLog : sortableNames) {
+              inputPaths.add(historyLog.getPath());
+            }
           }
         } else {
-          inputs.add(thisPath);
+          inputPaths.add(thisPath);
         }
       }
+
+      return inputPaths;
     }
   }