You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 07:02:06 UTC
svn commit: r1079266 - in /hadoop/mapreduce/branches/yahoo-merge/src:
docs/src/documentation/content/xdocs/rumen.xml
test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
Author: omalley
Date: Tue Mar 8 06:02:06 2011
New Revision: 1079266
URL: http://svn.apache.org/viewvc?rev=1079266&view=rev
Log:
commit 963bb3b8e8a68deef5b8a0ee8985a7c26d801cd8
Author: Ravi Gummadi <gr...@yahoo-inc.com>
Date: Fri Feb 18 15:29:34 2011 -0800
[MR-1978] :Rumen: Recursive scanning of input directories by
TraceBuilder. Patch is available at
(gravi)
+++ b/YAHOO-CHANGES.txt
+ [MR-1978] :Rumen: Recursive scanning of input directories by
+ TraceBuilder. Patch is available at
+ (gravi)
+
Modified:
hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml
hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
Modified: hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml?rev=1079266&r1=1079265&r2=1079266&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/docs/src/documentation/content/xdocs/rumen.xml Tue Mar 8 06:02:06 2011
@@ -153,15 +153,15 @@
<title>Trace Builder</title>
<p><code>Command:</code></p>
- <source>java org.apache.hadoop.tools.rumen.TraceBuilder [options] [jobtrace-output] [topology-output] [input]</source>
-
+ <source>java org.apache.hadoop.tools.rumen.TraceBuilder [options] <jobtrace-output> <topology-output> <inputs></source>
+
<p>This command invokes the <code>TraceBuilder</code> utility of
<em>Rumen</em>. It converts the JobHistory files into a series of JSON
- objects and output them in the <code>[jobtrace-output]</code> file.
- It also extracts the cluster layout (topology) and outputs it in the
- <code>[topology-output]</code> file.
- <code>[input]</code> represents a space separated list of JobHistory
- files and folders.
+ objects and writes them into the <code><jobtrace-output></code>
+ file. It also extracts the cluster layout (topology) and writes it in
+ the<code><topology-output></code> file.
+ <code><inputs></code> represents a space-separated list of
+ JobHistory files and folders.
</p>
<note>1) Input and output to <code>TraceBuilder</code> is expected to
@@ -173,9 +173,12 @@
regular expressions.
</note>
<note>
- 2) TraceBuilder does not recursively scan the input folder for
- job history files. Only the files that are directly placed under
- the input folder will be considered for generating the trace.
+ 2) By default, TraceBuilder does not recursively scan the input
+ folder for job history files. Only the files that are directly
+ placed under the input folder will be considered for generating
+ the trace. To add all the files under the input directory by
+ recursively scanning the input directory, use â-recursiveâ
+ option.
</note>
<p>Cluster topology is used as follows :</p>
@@ -208,6 +211,15 @@
from the source files.
</td>
</tr>
+ <tr>
+ <td><code>-recursive</code></td>
+ <td>Recursively traverse input paths for job history logs.</td>
+ <td>This option should be used to inform the TraceBuilder to
+ recursively scan the input paths and process all the files under it.
+ Note that, by default, only the history logs that are directly under
+ the input folder are considered for generating the trace.
+ </td>
+ </tr>
</table>
<section>
Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1079266&r1=1079265&r2=1079266&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Mar 8 06:02:06 2011
@@ -345,61 +345,6 @@ public class TestRumenJobTraces {
}
/**
- * Test if {@link TraceBuilder} can process globbed input file paths.
- */
- @Test
- public void testGlobbedInput() throws Exception {
- final Configuration conf = new Configuration();
- final FileSystem lfs = FileSystem.getLocal(conf);
-
- // define the test's root temporary directory
- final Path rootTempDir =
- new Path(System.getProperty("test.build.data", "/tmp"))
- .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
- // define the test's root input directory
- Path testRootInputDir = new Path(rootTempDir, "TestGlobbedInputPath");
- // define the nested input directory
- Path nestedInputDir = new Path(testRootInputDir, "1/2/3/4");
- // define the globbed version of the nested input directory
- Path globbedInputNestedDir =
- lfs.makeQualified(new Path(testRootInputDir, "*/*/*/*/*"));
-
- // define a file in the nested test input directory
- Path inputPath1 = new Path(nestedInputDir, "test.txt");
- // define a sub-folder in the nested test input directory
- Path inputPath2Parent = new Path(nestedInputDir, "test");
- lfs.mkdirs(inputPath2Parent);
- // define a file in the sub-folder within the nested test input directory
- Path inputPath2 = new Path(inputPath2Parent, "test.txt");
-
- // create empty input files
- lfs.createNewFile(inputPath1);
- lfs.createNewFile(inputPath2);
-
- // define the output trace and topology files
- Path outputTracePath = new Path(testRootInputDir, "test.json");
- Path outputTopologyTracePath = new Path(testRootInputDir, "topology.json");
-
- String[] args =
- new String[] {outputTracePath.toString(),
- outputTopologyTracePath.toString(),
- globbedInputNestedDir.toString() };
-
- // invoke TraceBuilder's MyOptions command options parsing module/utility
- MyOptions options = new TraceBuilder.MyOptions(args, conf);
-
- lfs.delete(testRootInputDir, true);
-
- assertEquals("Error in detecting globbed input FileSystem paths",
- 2, options.inputs.size());
-
- assertTrue("Missing input file " + inputPath1,
- options.inputs.contains(inputPath1));
- assertTrue("Missing input file " + inputPath2,
- options.inputs.contains(inputPath2));
- }
-
- /**
* Test if {@link CurrentJHParser} can read events from current JH files.
*/
@Test
@@ -519,6 +464,163 @@ public class TestRumenJobTraces {
validateJobConfParser("sample-conf.file.new.xml", true);
}
+ /**
+ * Check if processing of input arguments is as expected by passing globbed
+ * input path
+ * <li> without -recursive option and
+ * <li> with -recursive option.
+ */
+ @Test
+ public void testProcessInputArgument() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ // define the test's root temporary directory
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp"))
+ .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+ // define the test's root input directory
+ Path testRootInputDir = new Path(rootTempDir, "TestProcessInputArgument");
+ // define the nested input directory
+ Path nestedInputDir = new Path(testRootInputDir, "1/2/3/4");
+ // define the globbed version of the nested input directory
+ Path globbedInputNestedDir =
+ lfs.makeQualified(new Path(testRootInputDir, "*/*/*/*/*"));
+ try {
+ lfs.delete(nestedInputDir, true);
+
+ List<String> recursiveInputPaths = new ArrayList<String>();
+ List<String> nonRecursiveInputPaths = new ArrayList<String>();
+ // Create input files under the given path with multiple levels of
+ // sub directories
+ createHistoryLogsHierarchy(nestedInputDir, lfs, recursiveInputPaths,
+ nonRecursiveInputPaths);
+
+ // Check the case of globbed input path and without -recursive option
+ List<Path> inputs = MyOptions.processInputArgument(
+ globbedInputNestedDir.toString(), conf, false);
+ validateHistoryLogPaths(inputs, nonRecursiveInputPaths);
+
+ // Check the case of globbed input path and with -recursive option
+ inputs = MyOptions.processInputArgument(
+ globbedInputNestedDir.toString(), conf, true);
+ validateHistoryLogPaths(inputs, recursiveInputPaths);
+
+ } finally {
+ lfs.delete(testRootInputDir, true);
+ }
+ }
+
+ /**
+ * Validate if the input history log paths are as expected.
+ * @param inputs the resultant input paths to be validated
+ * @param expectedHistoryFileNames the expected input history logs
+ * @throws IOException
+ */
+ private void validateHistoryLogPaths(List<Path> inputs,
+ List<String> expectedHistoryFileNames) throws IOException {
+
+ System.out.println("\nExpected history files are:");
+ for (String historyFile : expectedHistoryFileNames) {
+ System.out.println(historyFile);
+ }
+
+ System.out.println("\nResultant history files are:");
+ List<String> historyLogs = new ArrayList<String>();
+ for (Path p : inputs) {
+ historyLogs.add(p.toUri().getPath());
+ System.out.println(p.toUri().getPath());
+ }
+
+ assertEquals("Number of history logs found is different from the expected.",
+ expectedHistoryFileNames.size(), inputs.size());
+
+ // Verify if all the history logs are expected ones and they are in the
+ // expected order
+ assertTrue("Some of the history log files do not match the expected.",
+ historyLogs.equals(expectedHistoryFileNames));
+ }
+
+ /**
+ * Create history logs under the given path with multiple levels of
+ * sub directories as shown below.
+ * <br>
+ * Create a file, an empty subdirectory and a nonempty subdirectory
+ * <historyDir> under the given input path.
+ * <br>
+ * The subdirectory <historyDir> contains the following dir structure:
+ * <br>
+ * <br><historyDir>/historyFile1.txt
+ * <br><historyDir>/historyFile1.gz
+ * <br><historyDir>/subDir1/historyFile2.txt
+ * <br><historyDir>/subDir1/historyFile2.gz
+ * <br><historyDir>/subDir2/historyFile3.txt
+ * <br><historyDir>/subDir2/historyFile3.gz
+ * <br><historyDir>/subDir1/subDir11/historyFile4.txt
+ * <br><historyDir>/subDir1/subDir11/historyFile4.gz
+ * <br><historyDir>/subDir2/subDir21/
+ * <br>
+ * Create the lists of input paths that should be processed by TraceBuilder
+ * for recursive case and non-recursive case.
+ * @param nestedInputDir the input history logs directory where history files
+ * with nested subdirectories are created
+ * @param fs FileSystem of the input paths
+ * @param recursiveInputPaths input paths for recursive case
+ * @param nonRecursiveInputPaths input paths for non-recursive case
+ * @throws IOException
+ */
+ private void createHistoryLogsHierarchy(Path nestedInputDir, FileSystem fs,
+ List<String> recursiveInputPaths, List<String> nonRecursiveInputPaths)
+ throws IOException {
+ List<Path> dirs = new ArrayList<Path>();
+ // define a file in the nested test input directory
+ Path inputPath1 = new Path(nestedInputDir, "historyFile.txt");
+ // define an empty sub-folder in the nested test input directory
+ Path emptyDir = new Path(nestedInputDir, "emptyDir");
+ // define a nonempty sub-folder in the nested test input directory
+ Path historyDir = new Path(nestedInputDir, "historyDir");
+
+ fs.mkdirs(nestedInputDir);
+ // Create an empty input file
+ fs.createNewFile(inputPath1);
+ // Create empty subdir
+ fs.mkdirs(emptyDir);// let us not create any files under this dir
+
+ fs.mkdirs(historyDir);
+ dirs.add(historyDir);
+
+ Path subDir1 = new Path(historyDir, "subDir1");
+ fs.mkdirs(subDir1);
+ dirs.add(subDir1);
+ Path subDir2 = new Path(historyDir, "subDir2");
+ fs.mkdirs(subDir2);
+ dirs.add(subDir2);
+
+ Path subDir11 = new Path(subDir1, "subDir11");
+ fs.mkdirs(subDir11);
+ dirs.add(subDir11);
+ Path subDir21 = new Path(subDir2, "subDir21");
+ fs.mkdirs(subDir21);// let us not create any files under this dir
+
+ int i = 0;
+ for (Path dir : dirs) {
+ i++;
+ Path gzPath = new Path(dir, "historyFile" + i + ".gz");
+ Path txtPath = new Path(dir, "historyFile" + i + ".txt");
+ fs.createNewFile(txtPath);
+ fs.createNewFile(gzPath);
+ recursiveInputPaths.add(gzPath.toUri().getPath());
+ recursiveInputPaths.add(txtPath.toUri().getPath());
+ if (i == 1) {
+ nonRecursiveInputPaths.add(gzPath.toUri().getPath());
+ nonRecursiveInputPaths.add(txtPath.toUri().getPath());
+ }
+ }
+ recursiveInputPaths.add(inputPath1.toUri().getPath());
+ nonRecursiveInputPaths.add(inputPath1.toUri().getPath());
+ }
+
+
private void validateJobConfParser(String confFile, boolean newConfig)
throws Exception {
final Configuration conf = new Configuration();
Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=1079266&r1=1079265&r2=1079266&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Tue Mar 8 06:02:06 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
@@ -34,7 +35,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
@@ -67,69 +70,110 @@ public class TraceBuilder extends Config
IOException, ClassNotFoundException {
int switchTop = 0;
+ // to determine if the input paths should be recursively scanned or not
+ boolean doRecursiveTraversal = false;
+
while (args[switchTop].startsWith("-")) {
if (args[switchTop].equalsIgnoreCase("-demuxer")) {
inputDemuxerClass =
Class.forName(args[++switchTop]).asSubclass(InputDemuxer.class);
-
- ++switchTop;
+ } else if (args[switchTop].equalsIgnoreCase("-recursive")) {
+ doRecursiveTraversal = true;
}
+ ++switchTop;
}
traceOutput = new Path(args[0 + switchTop]);
topologyOutput = new Path(args[1 + switchTop]);
for (int i = 2 + switchTop; i < args.length; ++i) {
- processInputArguments(args[i], conf);
+ inputs.addAll(processInputArgument(
+ args[i], conf, doRecursiveTraversal));
+ }
+ }
+
+ /**
+ * Compare the history file names, not the full paths.
+ * Job history file name format is such that doing lexicographic sort on the
+ * history file names should result in the order of jobs' submission times.
+ */
+ private static class HistoryLogsComparator
+ implements Comparator<FileStatus> {
+ @Override
+ public int compare(FileStatus file1, FileStatus file2) {
+ return file1.getPath().getName().compareTo(
+ file2.getPath().getName());
}
}
-
- /** Processes the input file/folder arguments. If the input is a file then
- * it is directly considered for further processing. If the input is a
- * folder, then all the files in the input folder are considered for
- * further processing.
+
+ /**
+ * Processes the input file/folder argument. If the input is a file,
+ * then it is directly considered for further processing by TraceBuilder.
+ * If the input is a folder, then all the history logs in the
+ * input folder are considered for further processing.
+ *
+ * If isRecursive is true, then the input path is recursively scanned
+ * for job history logs for further processing by TraceBuilder.
*
- * NOTE: If the input represents a globbed path, then it is first flattened
- * and then the individual paths represented by the globbed input
- * path are processed.
+ * NOTE: If the input represents a globbed path, then it is first flattened
+ * and then the individual paths represented by the globbed input
+ * path are considered for further processing.
+ *
+ * @param input input path, possibly globbed
+ * @param conf configuration
+ * @param isRecursive whether to recursively traverse the input paths to
+ * find history logs
+ * @return the input history log files' paths
+ * @throws FileNotFoundException
+ * @throws IOException
*/
- private void processInputArguments(String input, Configuration conf)
- throws IOException {
+ static List<Path> processInputArgument(String input, Configuration conf,
+ boolean isRecursive) throws FileNotFoundException, IOException {
Path inPath = new Path(input);
FileSystem fs = inPath.getFileSystem(conf);
FileStatus[] inStatuses = fs.globStatus(inPath);
-
+
+ List<Path> inputPaths = new LinkedList<Path>();
if (inStatuses == null || inStatuses.length == 0) {
- return;
+ return inputPaths;
}
-
+
for (FileStatus inStatus : inStatuses) {
Path thisPath = inStatus.getPath();
if (inStatus.isDirectory()) {
- FileStatus[] statuses = fs.listStatus(thisPath);
-
- List<String> dirNames = new ArrayList<String>();
- for (FileStatus s : statuses) {
- if (s.isDirectory()) continue;
- String name = s.getPath().getName();
+ // Find list of files in this path(recursively if -recursive option
+ // is specified).
+ List<FileStatus> historyLogs = new ArrayList<FileStatus>();
+
+ RemoteIterator<LocatedFileStatus> iter =
+ fs.listFiles(thisPath, isRecursive);
+ while (iter.hasNext()) {
+ LocatedFileStatus child = iter.next();
+ String fileName = child.getPath().getName();
- if (!(name.endsWith(".crc") || name.startsWith("."))) {
- dirNames.add(name);
+ if (!(fileName.endsWith(".crc") || fileName.startsWith("."))) {
+ historyLogs.add(child);
}
}
- String[] sortableNames = dirNames.toArray(new String[1]);
+ if (historyLogs.size() > 0) {
+ // Add the sorted history log file names in this path to the
+ // inputPaths list
+ FileStatus[] sortableNames = historyLogs.toArray(
+ new FileStatus[historyLogs.size()]);
+ Arrays.sort(sortableNames, new HistoryLogsComparator());
- Arrays.sort(sortableNames);
-
- for (String dirName : sortableNames) {
- inputs.add(new Path(thisPath, dirName));
+ for (FileStatus historyLog : sortableNames) {
+ inputPaths.add(historyLog.getPath());
+ }
}
} else {
- inputs.add(thisPath);
+ inputPaths.add(thisPath);
}
}
+
+ return inputPaths;
}
}