You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/07/16 08:35:26 UTC
svn commit: r964692 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapreduce/
src/java/org/apache/hadoop/mapreduce/jobhistory/
src/test/mapred/org/apache/hadoop/tools/rumen/
src/tools/org/apache/hadoop/tools/rumen/
Author: amareshwari
Date: Fri Jul 16 06:35:26 2010
New Revision: 964692
URL: http://svn.apache.org/viewvc?rev=964692&view=rev
Log:
MAPREDUCE-1865. Rumen should also support jobhistory files generated using trunk. Contributed by Amar Kamat
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 16 06:35:26 2010
@@ -167,6 +167,9 @@ Trunk (unreleased changes)
HADOOP-6845. Also, introduces Credentials in JobConf, and in JobContext.
(Jitendra Pandey and Arun Murthy via ddas)
+ MAPREDUCE-1865. Rumen should also support jobhistory files generated using
+ trunk. (Amar Kamat via amareshwari)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java Fri Jul 16 06:35:26 2010
@@ -51,6 +51,11 @@ import org.apache.hadoop.io.Text;
public class JobID extends org.apache.hadoop.mapred.ID
implements Comparable<ID> {
protected static final String JOB = "job";
+
+ // Jobid regex for various tools and framework components
+ public static final String JOBID_REGEX =
+ JOB + SEPARATOR + "[0-9]+" + SEPARATOR + "[0-9]+";
+
private final Text jtIdentifier;
protected static final NumberFormat idFormat = NumberFormat.getInstance();
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Fri Jul 16 06:35:26 2010
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -92,6 +93,13 @@ public class JobHistory {
Collections.<JobID,MovedFileInfo>synchronizedMap(
new LinkedHashMap<JobID, MovedFileInfo>());
+ // JobHistory filename regex
+ public static final Pattern JOBHISTORY_FILENAME_REGEX =
+ Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+");
+ // JobHistory conf-filename regex
+ public static final Pattern CONF_FILENAME_REGEX =
+ Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+
private static class MovedFileInfo {
private final String historyFile;
private final long timestamp;
@@ -372,13 +380,20 @@ public class JobHistory {
return jobFilePath;
}
+ /**
+ * Generates a suffix for old/stale jobhistory files
+ * Pattern : . + identifier + .old
+ */
+ public static String getOldFileSuffix(String identifier) {
+ return "." + identifier + JobHistory.OLD_SUFFIX;
+ }
+
private void moveOldFiles() throws IOException {
//move the log files remaining from last run to the DONE folder
//suffix the file name based on Job tracker identifier so that history
//files with same job id don't get over written in case of recovery.
FileStatus[] files = logDirFs.listStatus(logDir);
- String jtIdentifier = jobTracker.getTrackerIdentifier();
- String fileSuffix = "." + jtIdentifier + JobHistory.OLD_SUFFIX;
+ String fileSuffix = getOldFileSuffix(jobTracker.getTrackerIdentifier());
for (FileStatus fileStatus : files) {
Path fromPath = fileStatus.getPath();
if (fromPath.equals(done)) { //DONE can be a subfolder of log dir
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Jul 16 06:35:26 2010
@@ -26,6 +26,7 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -36,13 +37,23 @@ import org.apache.hadoop.io.compress.Cod
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -324,6 +335,165 @@ public class TestRumenJobTraces {
}
}
+ /**
+ * Tests if {@link TraceBuilder} can correctly identify and parse jobhistory
+ * filenames. The testcase checks if {@link TraceBuilder}
+ * - correctly identifies a jobhistory filename without suffix
+ * - correctly parses a jobhistory filename without suffix to extract out
+ * the jobid
+ * - correctly identifies a jobhistory filename with suffix
+ * - correctly parses a jobhistory filename with suffix to extract out the
+ * jobid
+ * - correctly identifies a job-configuration filename stored along with the
+ * jobhistory files
+ */
+ @Test
+ public void testJobHistoryFilenameParsing() throws IOException {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+ String user = "test";
+ org.apache.hadoop.mapred.JobID jid =
+ new org.apache.hadoop.mapred.JobID("12345", 1);
+ final Path rootInputDir =
+ new Path(System.getProperty("test.tools.input.dir", ""))
+ .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+
+ // Check if jobhistory filename are detected properly
+ Path jhFilename = JobHistory.getJobHistoryFile(rootInputDir, jid, user);
+ JobID extractedJID =
+ JobID.forName(TraceBuilder.extractJobID(jhFilename.getName()));
+ assertEquals("TraceBuilder failed to parse the current JH filename",
+ jid, extractedJID);
+ // test jobhistory filename with old/stale file suffix
+ jhFilename = jhFilename.suffix(JobHistory.getOldFileSuffix("123"));
+ extractedJID =
+ JobID.forName(TraceBuilder.extractJobID(jhFilename.getName()));
+ assertEquals("TraceBuilder failed to parse the current JH filename"
+ + "(old-suffix)",
+ jid, extractedJID);
+
+ // Check if the conf filename in jobhistory are detected properly
+ Path jhConfFilename = JobHistory.getConfFile(rootInputDir, jid);
+ assertTrue("TraceBuilder failed to parse the current JH conf filename",
+ TraceBuilder.isJobConfXml(jhConfFilename.getName(), null));
+ // test jobhistory conf filename with old/stale file suffix
+ jhConfFilename = jhConfFilename.suffix(JobHistory.getOldFileSuffix("123"));
+ assertTrue("TraceBuilder failed to parse the current JH conf filename"
+ + " (old suffix)",
+ TraceBuilder.isJobConfXml(jhConfFilename.getName(), null));
+ }
+
+ /**
+ * Test if {@link CurrentJHParser} can read events from current JH files.
+ */
+ @Test
+ public void testCurrentJHParser() throws Exception {
+ final Configuration conf = new Configuration();
+ final FileSystem lfs = FileSystem.getLocal(conf);
+
+ final Path rootTempDir =
+ new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+ lfs.getUri(), lfs.getWorkingDirectory());
+
+ final Path tempDir = new Path(rootTempDir, "TestCurrentJHParser");
+ lfs.delete(tempDir, true);
+
+ // Run a MR job
+ // create a MR cluster
+ conf.setInt(TTConfig.TT_MAP_SLOTS, 1);
+ conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
+ MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null,
+ new JobConf(conf));
+
+ // run a job
+ Path inDir = new Path(tempDir, "input");
+ Path outDir = new Path(tempDir, "output");
+ JobHistoryParser parser = null;
+ RewindableInputStream ris = null;
+ ArrayList<String> seenEvents = new ArrayList<String>(10);
+
+ try {
+ JobConf jConf = mrCluster.createJobConf();
+ // construct a job with 1 map and 1 reduce task.
+ Job job = MapReduceTestUtil.createJob(jConf, inDir, outDir, 1, 1);
+ // disable setup/cleanup
+ job.setJobSetupCleanupNeeded(false);
+ // set the output format to take care of the _temporary folder
+ job.setOutputFormatClass(MyOutputFormat.class);
+ // wait for the job to complete
+ job.waitForCompletion(false);
+
+ assertTrue("Job failed", job.isSuccessful());
+
+ JobID id = job.getJobID();
+ JobClient jc = new JobClient(jConf);
+ String user = jc.getAllJobs()[0].getUsername();
+
+ // get the jobhistory filepath
+ Path jhPath =
+ new Path(mrCluster.getJobTrackerRunner().getJobTracker()
+ .getJobHistoryDir());
+ Path inputPath = JobHistory.getJobHistoryFile(jhPath, id, user);
+ // wait for 10 secs for the jobhistory file to move into the done folder
+ for (int i = 0; i < 100; ++i) {
+ if (lfs.exists(inputPath)) {
+ break;
+ }
+ TimeUnit.MILLISECONDS.wait(100);
+ }
+
+ assertTrue("Missing job history file", lfs.exists(inputPath));
+
+ InputDemuxer inputDemuxer = new DefaultInputDemuxer();
+ inputDemuxer.bindTo(inputPath, conf);
+
+ Pair<String, InputStream> filePair = inputDemuxer.getNext();
+
+ assertNotNull(filePair);
+
+ ris = new RewindableInputStream(filePair.second());
+
+ // Test if the JobHistoryParserFactory can detect the parser correctly
+ parser = JobHistoryParserFactory.getParser(ris);
+
+ HistoryEvent e;
+ while ((e = parser.nextEvent()) != null) {
+ String eventString = e.getEventType().toString();
+ System.out.println(eventString);
+ seenEvents.add(eventString);
+ }
+ } finally {
+ // stop the MR cluster
+ mrCluster.shutdown();
+
+ if (ris != null) {
+ ris.close();
+ }
+ if (parser != null) {
+ parser.close();
+ }
+
+ // cleanup the filesystem
+ lfs.delete(tempDir, true);
+ }
+
+ // Check against the gold standard
+ System.out.println("testCurrentJHParser validating using gold std ");
+ String[] goldLines = new String[] {"JOB_SUBMITTED", "JOB_PRIORITY_CHANGED",
+ "JOB_INITED", "JOB_STATUS_CHANGED", "TASK_STARTED",
+ "MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_FINISHED", "TASK_FINISHED",
+ "TASK_STARTED", "REDUCE_ATTEMPT_STARTED", "REDUCE_ATTEMPT_FINISHED",
+ "TASK_FINISHED", "JOB_FINISHED"};
+
+ // Check the output with gold std
+ assertEquals("Size mismatch", goldLines.length, seenEvents.size());
+
+ int index = 0;
+ for (String goldLine : goldLines) {
+ assertEquals("Content mismatch", goldLine, seenEvents.get(index++));
+ }
+ }
+
@Test
public void testJobConfigurationParser() throws Exception {
String[] list1 =
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Fri Jul 16 06:35:26 2010
@@ -26,6 +26,7 @@ import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
@@ -403,6 +404,12 @@ public class JobBuilder {
return Values.SETUP;
}
+ // Note that pre-21, the task state of a successful task was logged as
+ // SUCCESS while from 21 onwards, its logged as SUCCEEDED.
+ if (name.equalsIgnoreCase(TaskStatus.State.SUCCEEDED.toString())) {
+ return Values.SUCCESS;
+ }
+
return Values.valueOf(name.toUpperCase());
}
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java Fri Jul 16 06:35:26 2010
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.tools.rumen;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.mapreduce.JobID;
+
/**
*
*
@@ -45,5 +49,20 @@ public class Pre21JobHistoryConstants {
public static enum Values {
SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
}
+
+ /**
+ * Pre21 regex for jobhistory filename
+ * i.e jt-identifier_job-id_user-name_job-name
+ */
+ static final Pattern JOBHISTORY_FILENAME_REGEX =
+ Pattern.compile("[^.].+_(" + JobID.JOBID_REGEX + ")_.+");
+
+ /**
+ * Pre21 regex for jobhistory conf filename
+ * i.e jt-identifier_job-id_conf.xml
+ */
+ static final Pattern CONF_FILENAME_REGEX =
+ Pattern.compile("[^.].+_(" + JobID.JOBID_REGEX
+ + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
}
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Fri Jul 16 06:35:26 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -53,20 +54,6 @@ public class TraceBuilder extends Config
Outputter<LoggedJob> traceWriter;
Outputter<LoggedNetworkTopology> topologyWriter;
- // Needs to be interpreted greedily or otherwise constrained
- static final String jobIDRegex = "job_[0-9]+_[0-9]+";
-
- // returns jobID in Capturing Group 1
- static final Pattern confFileNameRegex =
- Pattern.compile("[^.].+_(" + jobIDRegex
- + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
-
- // This can match text that confFileNameRegex will also match. The code
- // gives precedence to confFileNameRegex . Returns jobID
- // in Capturing Group 1
- static final Pattern jobFileNameRegex =
- Pattern.compile("[^.].+_(" + jobIDRegex + ")_.+");
-
static class MyOptions {
Class<? extends InputDemuxer> inputDemuxerClass = DefaultInputDemuxer.class;
@@ -164,11 +151,23 @@ public class TraceBuilder extends Config
* [especially for .crc files] we return null.
*/
static String extractJobID(String fileName) {
- return applyParser(fileName, jobFileNameRegex);
+ String jobId = applyParser(fileName, JobHistory.JOBHISTORY_FILENAME_REGEX);
+ if (jobId == null) {
+ // check if its a pre21 jobhistory file
+ jobId = applyParser(fileName,
+ Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX);
+ }
+ return jobId;
}
static boolean isJobConfXml(String fileName, InputStream input) {
- return applyParser(fileName, confFileNameRegex) != null;
+ String jobId = applyParser(fileName, JobHistory.CONF_FILENAME_REGEX);
+ if (jobId == null) {
+ // check if its a pre21 jobhistory conf file
+ jobId = applyParser(fileName,
+ Pre21JobHistoryConstants.CONF_FILENAME_REGEX);
+ }
+ return jobId != null;
}
private void addInterestedProperties(List<String> interestedProperties,