You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2010/07/16 08:35:26 UTC

svn commit: r964692 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/test/mapred/org/apache/hadoop/tools/rumen/ src/tools/org/apache/hadoop/tools/rumen/

Author: amareshwari
Date: Fri Jul 16 06:35:26 2010
New Revision: 964692

URL: http://svn.apache.org/viewvc?rev=964692&view=rev
Log:
MAPREDUCE-1865. Rumen should also support jobhistory files generated using trunk. Contributed by Amar Kamat

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 16 06:35:26 2010
@@ -167,6 +167,9 @@ Trunk (unreleased changes)
     HADOOP-6845. Also, introduces Credentials in JobConf, and in JobContext.
     (Jitendra Pandey and Arun Murthy via ddas)
 
+    MAPREDUCE-1865. Rumen should also support jobhistory files generated using
+    trunk. (Amar Kamat via amareshwari)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobID.java Fri Jul 16 06:35:26 2010
@@ -51,6 +51,11 @@ import org.apache.hadoop.io.Text;
 public class JobID extends org.apache.hadoop.mapred.ID 
                    implements Comparable<ID> {
   protected static final String JOB = "job";
+  
+  // Jobid regex for various tools and framework components
+  public static final String JOBID_REGEX = 
+    JOB + SEPARATOR + "[0-9]+" + SEPARATOR + "[0-9]+";
+  
   private final Text jtIdentifier;
   
   protected static final NumberFormat idFormat = NumberFormat.getInstance();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Fri Jul 16 06:35:26 2010
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -92,6 +93,13 @@ public class JobHistory {
     Collections.<JobID,MovedFileInfo>synchronizedMap(
         new LinkedHashMap<JobID, MovedFileInfo>());
 
+  // JobHistory filename regex
+  public static final Pattern JOBHISTORY_FILENAME_REGEX = 
+    Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+");
+  // JobHistory conf-filename regex
+  public static final Pattern CONF_FILENAME_REGEX =
+    Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+  
   private static class MovedFileInfo {
     private final String historyFile;
     private final long timestamp;
@@ -372,13 +380,20 @@ public class JobHistory {
     return jobFilePath;
   }
 
+  /**
+   * Generates a suffix for old/stale jobhistory files
+   * Pattern : . + identifier + .old
+   */
+  public static String getOldFileSuffix(String identifier) {
+    return "." + identifier + JobHistory.OLD_SUFFIX;
+  }
+  
   private void moveOldFiles() throws IOException {
     //move the log files remaining from last run to the DONE folder
     //suffix the file name based on Job tracker identifier so that history
     //files with same job id don't get over written in case of recovery.
     FileStatus[] files = logDirFs.listStatus(logDir);
-    String jtIdentifier = jobTracker.getTrackerIdentifier();
-    String fileSuffix = "." + jtIdentifier + JobHistory.OLD_SUFFIX;
+    String fileSuffix = getOldFileSuffix(jobTracker.getTrackerIdentifier());
     for (FileStatus fileStatus : files) {
       Path fromPath = fileStatus.getPath();
       if (fromPath.equals(done)) { //DONE can be a subfolder of log dir

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Fri Jul 16 06:35:26 2010
@@ -26,6 +26,7 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -36,13 +37,23 @@ import org.apache.hadoop.io.compress.Cod
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TestNoJobSetupCleanup.MyOutputFormat;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.util.LineReader;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -324,6 +335,165 @@ public class TestRumenJobTraces {
     }
   }
 
+  /**
+   * Tests if {@link TraceBuilder} can correctly identify and parse jobhistory
+   * filenames. The testcase checks if {@link TraceBuilder}
+   *   - correctly identifies a jobhistory filename without suffix
+   *   - correctly parses a jobhistory filename without suffix to extract out 
+   *     the jobid
+   *   - correctly identifies a jobhistory filename with suffix
+   *   - correctly parses a jobhistory filename with suffix to extract out the 
+   *     jobid
+   *   - correctly identifies a job-configuration filename stored along with the 
+   *     jobhistory files
+   */
+  @Test
+  public void testJobHistoryFilenameParsing() throws IOException {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+    String user = "test";
+    org.apache.hadoop.mapred.JobID jid = 
+      new org.apache.hadoop.mapred.JobID("12345", 1);
+    final Path rootInputDir =
+      new Path(System.getProperty("test.tools.input.dir", ""))
+            .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+    
+    // Check if jobhistory filename are detected properly
+    Path jhFilename = JobHistory.getJobHistoryFile(rootInputDir, jid, user);
+    JobID extractedJID = 
+      JobID.forName(TraceBuilder.extractJobID(jhFilename.getName()));
+    assertEquals("TraceBuilder failed to parse the current JH filename", 
+                 jid, extractedJID);
+    // test jobhistory filename with old/stale file suffix
+    jhFilename = jhFilename.suffix(JobHistory.getOldFileSuffix("123"));
+    extractedJID =
+      JobID.forName(TraceBuilder.extractJobID(jhFilename.getName()));
+    assertEquals("TraceBuilder failed to parse the current JH filename"
+                 + "(old-suffix)", 
+                 jid, extractedJID);
+    
+    // Check if the conf filename in jobhistory are detected properly
+    Path jhConfFilename = JobHistory.getConfFile(rootInputDir, jid);
+    assertTrue("TraceBuilder failed to parse the current JH conf filename", 
+               TraceBuilder.isJobConfXml(jhConfFilename.getName(), null));
+    // test jobhistory conf filename with old/stale file suffix
+    jhConfFilename = jhConfFilename.suffix(JobHistory.getOldFileSuffix("123"));
+    assertTrue("TraceBuilder failed to parse the current JH conf filename" 
+               + " (old suffix)", 
+               TraceBuilder.isJobConfXml(jhConfFilename.getName(), null));
+  }
+
+  /**
+   * Test if {@link CurrentJHParser} can read events from current JH files.
+   */
+  @Test
+  public void testCurrentJHParser() throws Exception {
+    final Configuration conf = new Configuration();
+    final FileSystem lfs = FileSystem.getLocal(conf);
+
+    final Path rootTempDir =
+      new Path(System.getProperty("test.build.data", "/tmp")).makeQualified(
+          lfs.getUri(), lfs.getWorkingDirectory());
+
+    final Path tempDir = new Path(rootTempDir, "TestCurrentJHParser");
+    lfs.delete(tempDir, true);
+    
+    // Run a MR job
+    // create a MR cluster
+    conf.setInt(TTConfig.TT_MAP_SLOTS, 1);
+    conf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
+    MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null, 
+                                                new JobConf(conf));
+    
+    // run a job
+    Path inDir = new Path(tempDir, "input");
+    Path outDir = new Path(tempDir, "output");
+    JobHistoryParser parser = null;
+    RewindableInputStream ris = null;
+    ArrayList<String> seenEvents = new ArrayList<String>(10);
+    
+    try {
+      JobConf jConf = mrCluster.createJobConf();
+      // construct a job with 1 map and 1 reduce task.
+      Job job = MapReduceTestUtil.createJob(jConf, inDir, outDir, 1, 1);
+      // disable setup/cleanup
+      job.setJobSetupCleanupNeeded(false);
+      // set the output format to take care of the _temporary folder
+      job.setOutputFormatClass(MyOutputFormat.class);
+      // wait for the job to complete
+      job.waitForCompletion(false);
+      
+      assertTrue("Job failed", job.isSuccessful());
+
+      JobID id = job.getJobID();
+      JobClient jc = new JobClient(jConf);
+      String user = jc.getAllJobs()[0].getUsername();
+
+      // get the jobhistory filepath
+      Path jhPath = 
+        new Path(mrCluster.getJobTrackerRunner().getJobTracker()
+                          .getJobHistoryDir());
+      Path inputPath = JobHistory.getJobHistoryFile(jhPath, id, user);
+      // wait for 10 secs for the jobhistory file to move into the done folder
+      for (int i = 0; i < 100; ++i) {
+        if (lfs.exists(inputPath)) {
+          break;
+        }
+        TimeUnit.MILLISECONDS.wait(100);
+      }
+    
+      assertTrue("Missing job history file", lfs.exists(inputPath));
+    
+      InputDemuxer inputDemuxer = new DefaultInputDemuxer();
+      inputDemuxer.bindTo(inputPath, conf);
+    
+      Pair<String, InputStream> filePair = inputDemuxer.getNext();
+    
+      assertNotNull(filePair);
+    
+      ris = new RewindableInputStream(filePair.second());
+
+      // Test if the JobHistoryParserFactory can detect the parser correctly
+      parser = JobHistoryParserFactory.getParser(ris);
+        
+      HistoryEvent e;
+      while ((e = parser.nextEvent()) != null) {
+        String eventString = e.getEventType().toString();
+        System.out.println(eventString);
+        seenEvents.add(eventString);
+      }
+    } finally {
+      // stop the MR cluster
+      mrCluster.shutdown();
+      
+      if (ris != null) {
+          ris.close();
+      }
+      if (parser != null) {
+        parser.close();
+      }
+      
+      // cleanup the filesystem
+      lfs.delete(tempDir, true);
+    }
+
+    // Check against the gold standard
+    System.out.println("testCurrentJHParser validating using gold std ");
+    String[] goldLines = new String[] {"JOB_SUBMITTED", "JOB_PRIORITY_CHANGED",
+        "JOB_INITED", "JOB_STATUS_CHANGED", "TASK_STARTED", 
+        "MAP_ATTEMPT_STARTED", "MAP_ATTEMPT_FINISHED", "TASK_FINISHED", 
+        "TASK_STARTED", "REDUCE_ATTEMPT_STARTED", "REDUCE_ATTEMPT_FINISHED", 
+        "TASK_FINISHED", "JOB_FINISHED"};
+    
+    // Check the output with gold std
+    assertEquals("Size mismatch", goldLines.length, seenEvents.size());
+    
+    int index = 0;
+    for (String goldLine : goldLines) {
+      assertEquals("Content mismatch", goldLine, seenEvents.get(index++));
+    }
+  }
+  
   @Test
   public void testJobConfigurationParser() throws Exception {
     String[] list1 =

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Fri Jul 16 06:35:26 2010
@@ -26,6 +26,7 @@ import java.util.StringTokenizer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
@@ -403,6 +404,12 @@ public class JobBuilder {
       return Values.SETUP;
     }
 
+    // Note that pre-21, the task state of a successful task was logged as 
+    // SUCCESS while from 21 onwards, its logged as SUCCEEDED.
+    if (name.equalsIgnoreCase(TaskStatus.State.SUCCEEDED.toString())) {
+      return Values.SUCCESS;
+    }
+    
     return Values.valueOf(name.toUpperCase());
   }
 

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Pre21JobHistoryConstants.java Fri Jul 16 06:35:26 2010
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.tools.rumen;
 
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.mapreduce.JobID;
+
 /**
  * 
  *
@@ -45,5 +49,20 @@ public class Pre21JobHistoryConstants {
   public static enum Values {
     SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
   }
+  
+  /**
+   * Pre21 regex for jobhistory filename 
+   *   i.e jt-identifier_job-id_user-name_job-name
+   */
+  static final Pattern JOBHISTORY_FILENAME_REGEX =
+    Pattern.compile("[^.].+_(" + JobID.JOBID_REGEX + ")_.+");
+
+  /**
+   * Pre21 regex for jobhistory conf filename 
+   *   i.e jt-identifier_job-id_conf.xml
+   */
+  static final Pattern CONF_FILENAME_REGEX =
+    Pattern.compile("[^.].+_(" + JobID.JOBID_REGEX 
+                    + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
  
 }

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java?rev=964692&r1=964691&r2=964692&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TraceBuilder.java Fri Jul 16 06:35:26 2010
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -53,20 +54,6 @@ public class TraceBuilder extends Config
   Outputter<LoggedJob> traceWriter;
   Outputter<LoggedNetworkTopology> topologyWriter;
 
-  // Needs to be interpreted greedily or otherwise constrained
-  static final String jobIDRegex = "job_[0-9]+_[0-9]+";
-
-  // returns jobID in Capturing Group 1
-  static final Pattern confFileNameRegex =
-      Pattern.compile("[^.].+_(" + jobIDRegex
-          + ")_conf.xml(?:\\.[0-9a-zA-Z]+)?");
-
-  // This can match text that confFileNameRegex will also match. The code
-  // gives precedence to confFileNameRegex . Returns jobID
-  // in Capturing Group 1
-  static final Pattern jobFileNameRegex =
-      Pattern.compile("[^.].+_(" + jobIDRegex + ")_.+");
-
   static class MyOptions {
     Class<? extends InputDemuxer> inputDemuxerClass = DefaultInputDemuxer.class;
 
@@ -164,11 +151,23 @@ public class TraceBuilder extends Config
    *         [especially for .crc files] we return null.
    */
   static String extractJobID(String fileName) {
-    return applyParser(fileName, jobFileNameRegex);
+    String jobId = applyParser(fileName, JobHistory.JOBHISTORY_FILENAME_REGEX);
+    if (jobId == null) {
+      // check if its a pre21 jobhistory file
+      jobId = applyParser(fileName, 
+                          Pre21JobHistoryConstants.JOBHISTORY_FILENAME_REGEX);
+    }
+    return jobId;
   }
 
   static boolean isJobConfXml(String fileName, InputStream input) {
-    return applyParser(fileName, confFileNameRegex) != null;
+    String jobId = applyParser(fileName, JobHistory.CONF_FILENAME_REGEX);
+    if (jobId == null) {
+      // check if its a pre21 jobhistory conf file
+      jobId = applyParser(fileName, 
+                          Pre21JobHistoryConstants.CONF_FILENAME_REGEX);
+    }
+    return jobId != null;
   }
 
   private void addInterestedProperties(List<String> interestedProperties,