You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:38:25 UTC
svn commit: r1077631 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/
test/org/apache/hadoop/mapred/ test/org/apache/hadoop/tools/rumen/
tools/org/apache/hadoop/tools/ru...
Author: omalley
Date: Fri Mar 4 04:38:25 2011
New Revision: 1077631
URL: http://svn.apache.org/viewvc?rev=1077631&view=rev
Log:
commit 0398a9423c4d8fbb42e00f05911f44a8d18efb11
Author: Amar Ramesh Kamat <am...@yahoo-inc.com>
Date: Wed Aug 4 02:24:26 2010 +0530
MAPREDUCE:1865 Rumen can now process old and new jobhistory files.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobID.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1077631&r1=1077630&r2=1077631&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar 4 04:38:25 2011
@@ -168,6 +168,13 @@ public class JobHistory {
private static final SortedMap<Long, String>jobToDirectoryMap
= new TreeMap<Long, String>();
+ // JobHistory filename regex
+ public static final Pattern JOBHISTORY_FILENAME_REGEX =
+ Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+");
+ // JobHistory conf-filename regex
+ public static final Pattern CONF_FILENAME_REGEX =
+ Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml");
+
private static class MovedFileInfo {
private final String historyFile;
private final long timestamp;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobID.java?rev=1077631&r1=1077630&r2=1077631&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobID.java Fri Mar 4 04:38:25 2011
@@ -47,6 +47,11 @@ import org.apache.hadoop.io.Text;
public class JobID extends org.apache.hadoop.mapred.ID
implements Comparable<ID> {
protected static final String JOB = "job";
+
+ // Jobid regex for various tools and framework components
+ public static final String JOBID_REGEX =
+ JOB + SEPARATOR + "[0-9]+" + SEPARATOR + "[0-9]+";
+
private final Text jtIdentifier;
protected static final NumberFormat idFormat = NumberFormat.getInstance();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077631&r1=1077630&r2=1077631&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar 4 04:38:25 2011
@@ -583,8 +583,8 @@ public class UtilsForTests {
}
// Start a job and return its RunningJob object
- static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps,
- int numReds) throws IOException {
+ public static RunningJob runJob(JobConf conf, Path inDir, Path outDir,
+ int numMaps, int numReds) throws IOException {
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outDir)) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1077631&r1=1077630&r2=1077631&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Mar 4 04:38:25 2011
@@ -26,6 +26,7 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -36,6 +37,12 @@ import org.apache.hadoop.io.compress.Cod
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.UtilsForTests;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
@@ -323,6 +330,146 @@ public class TestRumenJobTraces {
}
}
+ /**
+ * Tests if {@link TraceBuilder} can correctly identify and parse jobhistory
+ * filenames. The testcase checks if {@link TraceBuilder}
+ * - correctly identifies a jobhistory filename
+ * - correctly parses a jobhistory filename to extract out the jobid
+ * - correctly identifies a job-configuration filename stored along with the
+ * jobhistory files
+ */
+ @Test
+ public void testJobHistoryFilenameParsing() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+ String user = "test";
+ org.apache.hadoop.mapred.JobID jid =
+ new org.apache.hadoop.mapred.JobID("12345", 1);
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", ""))
+ .makeQualified(lfs);
+
+ // Check if jobhistory filename are detected properly
+ Path jhFilename = new Path(jid + "_1234_user_jobname");
+ JobID extractedJID =
+ JobID.forName(TraceBuilder.extractJobID(jhFilename.getName()));
+ assertEquals("TraceBuilder failed to parse the current JH filename",
+ jid, extractedJID);
+
+ // Check if the conf filename in jobhistory are detected properly
+ Path jhConfFilename = new Path(jid + "_conf.xml");
+ assertTrue("TraceBuilder failed to parse the current JH conf filename",
+ TraceBuilder.isJobConfXml(jhConfFilename.getName(), null));
+ }
+
+ /**
+ * Test if {@link CurrentJHParser} can read events from current JH files.
+ */
+ @Test
+ public void testCurrentJHParser() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp"))
+ .makeQualified(lfs);
+
+ final Path tempDir = new Path(rootTempDir, "TestCurrentJHParser");
+ lfs.delete(tempDir, true);
+
+ // Run a MR job
+ // create a MR cluster
+ conf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+ conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+ MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null,
+ new JobConf(conf));
+
+ // run a job
+ Path inDir = new Path(tempDir, "input");
+ Path outDir = new Path(tempDir, "output");
+ JobHistoryParser parser = null;
+ RewindableInputStream ris = null;
+ ArrayList<String> seenEvents = new ArrayList<String>(10);
+ RunningJob rJob = null;
+
+ try {
+ // construct a job with 1 map and 1 reduce task.
+ rJob = UtilsForTests.runJob(mrCluster.createJobConf(), inDir, outDir, 1,
+ 1);
+ rJob.waitForCompletion();
+ assertTrue("Job failed", rJob.isSuccessful());
+
+ JobID id = rJob.getID();
+
+ // get the jobhistory filepath
+ Path inputPath =
+ new Path(JobHistory.getHistoryFilePath(
+ org.apache.hadoop.mapred.JobID.downgrade(id)));
+ // wait for 10 secs for the jobhistory file to move into the done folder
+ for (int i = 0; i < 100; ++i) {
+ if (lfs.exists(inputPath)) {
+ break;
+ }
+ TimeUnit.MILLISECONDS.wait(100);
+ }
+
+ assertTrue("Missing job history file", lfs.exists(inputPath));
+
+ InputDemuxer inputDemuxer = new DefaultInputDemuxer();
+ inputDemuxer.bindTo(inputPath, conf);
+
+ Pair<String, InputStream> filePair = inputDemuxer.getNext();
+
+ assertNotNull(filePair);
+
+ ris = new RewindableInputStream(filePair.second());
+
+ // Test if the JobHistoryParserFactory can detect the parser correctly
+ parser = JobHistoryParserFactory.getParser(ris);
+
+ HistoryEvent e;
+ while ((e = parser.nextEvent()) != null) {
+ String eventString = e.getEventType().toString();
+ System.out.println(eventString);
+ seenEvents.add(eventString);
+ }
+ } finally {
+ // stop the MR cluster
+ mrCluster.shutdown();
+
+ if (ris != null) {
+ ris.close();
+ }
+ if (parser != null) {
+ parser.close();
+ }
+
+ // cleanup the filesystem
+ lfs.delete(tempDir, true);
+ }
+
+ // Check against the gold standard
+ System.out.println("testCurrentJHParser validating using gold std ");
+ String[] goldLines = new String[] {"JOB_SUBMITTED", "JOB_PRIORITY_CHANGED",
+ "JOB_STATUS_CHANGED", "JOB_INITED", "JOB_INFO_CHANGED", "TASK_STARTED",
+ "MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_FINISHED", "MAP_ATTEMPT_FINISHED",
+ "TASK_UPDATED", "TASK_FINISHED", "JOB_STATUS_CHANGED", "TASK_STARTED",
+ "MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_FINISHED", "MAP_ATTEMPT_FINISHED",
+ "TASK_UPDATED", "TASK_FINISHED", "TASK_STARTED", "MAP_ATTEMPT_STARTED",
+ "MAP_ATTEMPT_FINISHED", "REDUCE_ATTEMPT_FINISHED", "TASK_UPDATED",
+ "TASK_FINISHED", "TASK_STARTED", "MAP_ATTEMPT_STARTED",
+ "MAP_ATTEMPT_FINISHED", "MAP_ATTEMPT_FINISHED", "TASK_UPDATED",
+ "TASK_FINISHED", "JOB_STATUS_CHANGED", "JOB_FINISHED"};
+
+ // Check the output with gold std
+ assertEquals("Size mismatch", goldLines.length, seenEvents.size());
+
+ int index = 0;
+ for (String goldLine : goldLines) {
+ assertEquals("Content mismatch", goldLine, seenEvents.get(index++));
+ }
+ }
+
@Test
public void testJobConfigurationParser() throws Exception {
String[] list1 =
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java?rev=1077631&r1=1077630&r2=1077631&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java Fri Mar 4 04:38:25 2011
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.tools.rumen;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.mapreduce.JobID;
+
/**
*
*
@@ -45,5 +49,20 @@ public class Pre21JobHistoryConstants {
public static enum Values {
SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
}
+
+ /**
+ * Pre21 regex for jobhistory filename
+ * i.e jt-identifier_job-id_user-name_job-name
+ */
+ static final Pattern JOBHISTORY_FILENAME_REGEX =
+ Pattern.compile("[^.].+_(" + JobID.JOBID_REGEX + ")_.+");
+
+ /**
+ * Pre21 regex for jobhistory conf filename
+ * i.e jt-identifier_job-id_conf.xml
+ */
+ static final Pattern CONF_FILENAME_REGEX =
+ Pattern.compile("[^.].+_(" + JobID.JOBID_REGEX
+ + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=1077631&r1=1077630&r2=1077631&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Fri Mar 4 04:38:25 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobHistory;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -52,20 +53,6 @@ public class TraceBuilder extends Config
Outputter<LoggedJob> traceWriter;
Outputter<LoggedNetworkTopology> topologyWriter;
- // Needs to be interpreted greedily or otherwise constrained
- static final String jobIDRegex = "job_[0-9]+_[0-9]+";
-
- // returns jobID in Capturing Group 1
- static final Pattern confFileNameRegex =
- Pattern.compile("[^.].+_(" + jobIDRegex
- + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
-
- // This can match text that confFileNameRegex will also match. The code
- // gives precedence to confFileNameRegex . Returns jobID
- // in Capturing Group 1
- static final Pattern jobFileNameRegex =
- Pattern.compile("[^.].+_(" + jobIDRegex + ")_.+");
-
static class MyOptions {
Class<? extends InputDemuxer> inputDemuxerClass = DefaultInputDemuxer.class;
@@ -163,11 +150,23 @@ public class TraceBuilder extends Config
* [especially for .crc files] we return null.
*/
static String extractJobID(String fileName) {
- return applyParser(fileName, jobFileNameRegex);
+ String jobId = applyParser(fileName, JobHistory.JOBHISTORY_FILENAME_REGEX);
+ if (jobId == null) {
+ // check if its a pre21 jobhistory file
+ jobId = applyParser(fileName,
+ Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX);
+ }
+ return jobId;
}
static boolean isJobConfXml(String fileName, InputStream input) {
- return applyParser(fileName, confFileNameRegex) != null;
+ String jobId = applyParser(fileName, JobHistory.CONF_FILENAME_REGEX);
+ if (jobId == null) {
+ // check if its a pre21 jobhistory conf file
+ jobId = applyParser(fileName,
+ Pre21JobHistoryConstants.CONF_FILENAME_REGEX);
+ }
+ return jobId != null;
}
private void addInterestedProperties(List<String> interestedProperties,