You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/29 10:47:22 UTC

svn commit: r700028 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobHistory.java src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java

Author: ddas
Date: Mon Sep 29 01:47:21 2008
New Revision: 700028

URL: http://svn.apache.org/viewvc?rev=700028&view=rev
Log:
HADOOP-4190. Fixes the backward compatibility issue with Job History introduced by HADOOP-3245 and HADOOP-2403. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=700028&r1=700027&r2=700028&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep 29 01:47:21 2008
@@ -779,6 +779,9 @@
     HADOOP-4189. Fixes the history blocksize & intertracker protocol version
     issues introduced as part of HADOOP-3245. (Amar Kamat via ddas)
 
+    HADOOP-4190. Fixes the backward compatibility issue with Job History.
+    introduced by HADOOP-3245 and HADOOP-2403. (Amar Kamat via ddas)
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=700028&r1=700027&r2=700028&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Mon Sep 29 01:47:21 2008
@@ -62,12 +62,21 @@
  * For parsing the job history it supports a listener based interface where each line is parsed
  * and passed to listener. The listener can create an object model of history or look for specific 
  * events and discard rest of the history.  
+ * 
+ * CHANGE LOG :
+ * Version 0 : The history has the following format : 
+ *             TAG KEY1="VALUE1" KEY2="VALUE2" and so on. 
+               TAG can be Job, Task, MapAttempt or ReduceAttempt. 
+               Note that a '"' is the line delimiter.
+ * Version 1 : Changes the line delimiter to '.'
+               Values are now escaped for unambiguous parsing. 
+               Added the Meta tag to store version info.
  */
 public class JobHistory {
   
+  static final long VERSION = 1L;
   public static final Log LOG = LogFactory.getLog(JobHistory.class);
   private static final String DELIMITER = " ";
-  private static final String LINE_DELIMITER = ".";
   private static final char LINE_DELIMITER_CHAR = '.';
   private static final char[] charsToEscape = new char[] {'"', '=', 
                                                 LINE_DELIMITER_CHAR};
@@ -91,7 +100,7 @@
    * A record type appears as the first token in a single line of log. 
    */
   public static enum RecordTypes {
-    Jobtracker, Job, Task, MapAttempt, ReduceAttempt
+    Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta
   }
 
   /**
@@ -105,7 +114,7 @@
     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
     SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
-    TRACKER_NAME, STATE_STRING
+    TRACKER_NAME, STATE_STRING, VERSION
   }
 
   /**
@@ -159,6 +168,70 @@
   }
 
   /**
+   * Manages job-history's meta information such as version etc.
+   * Helps in logging version information to the job-history and recover
+   * version information from the history. 
+   */
+  static class MetaInfoManager implements Listener {
+    private long version = 0L;
+    private KeyValuePair pairs = new KeyValuePair();
+    
+    // Extract the version of the history that was used to write the history
+    public MetaInfoManager(String line) throws IOException {
+      if (null != line) {
+        // Parse the line
+        parseLine(line, this, false);
+      }
+    }
+    
+    // Get the line delimiter
+    char getLineDelim() {
+      if (version == 0) {
+        return '"';
+      } else {
+        return LINE_DELIMITER_CHAR;
+      }
+    }
+    
+    // Checks if the values are escaped or not
+    boolean isValueEscaped() {
+      // Note that the values are not escaped in version 0
+      return version != 0;
+    }
+    
+    public void handle(RecordTypes recType, Map<Keys, String> values) 
+    throws IOException {
+      // Check if the record is of type META
+      if (RecordTypes.Meta == recType) {
+        pairs.handle(values);
+        version = pairs.getLong(Keys.VERSION); // defaults to 0
+      }
+    }
+    
+    /**
+     * Logs history meta-info to the history file. This needs to be called once
+     * per history file. 
+     * @param jobId job id, assigned by jobtracker. 
+     */
+    static void logMetaInfo(ArrayList<PrintWriter> writers){
+      if (!disableHistory){
+        if (null != writers){
+          JobHistory.log(writers, RecordTypes.Meta, 
+              new Keys[] {Keys.VERSION},
+              new String[] {String.valueOf(VERSION)}); 
+        }
+      }
+    }
+  }
+  
+  /** Escapes the string especially for {@link JobHistory}
+   */
+  static String escapeString(String data) {
+    return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR, 
+                                    charsToEscape);
+  }
+  
+  /**
    * Parses history file and invokes Listener.handle() for 
    * each line of history. It can be used for looking through history
    * files for specific items without having to keep whole history in memory. 
@@ -174,17 +247,34 @@
     try {
       String line = null; 
       StringBuffer buf = new StringBuffer(); 
-      while ((line = reader.readLine())!= null){
+      
+      // Read the meta-info line. Note that this might a jobinfo line for files
+      // written with older format
+      line = reader.readLine();
+      
+      // Check if the file is empty
+      if (line == null) {
+        return;
+      }
+      
+      // Get the information required for further processing
+      MetaInfoManager mgr = new MetaInfoManager(line);
+      boolean isEscaped = mgr.isValueEscaped();
+      String lineDelim = String.valueOf(mgr.getLineDelim());  
+      String escapedLineDelim = 
+        StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR, 
+                                 mgr.getLineDelim());
+      
+      do {
         buf.append(line); 
-        if (!line.trim().endsWith(LINE_DELIMITER) || 
-            line.trim().endsWith(StringUtils.escapeString(LINE_DELIMITER,
-                          StringUtils.ESCAPE_CHAR, LINE_DELIMITER_CHAR))) {
+        if (!line.trim().endsWith(lineDelim) 
+            || line.trim().endsWith(escapedLineDelim)) {
           buf.append("\n");
           continue; 
         }
-        parseLine(buf.toString(), l);
+        parseLine(buf.toString(), l, isEscaped);
         buf = new StringBuffer(); 
-      }
+      } while ((line = reader.readLine())!= null);
     } finally {
       try { reader.close(); } catch (IOException ex) {}
     }
@@ -196,7 +286,8 @@
    * @param l
    * @throws IOException
    */
-  private static void parseLine(String line, Listener l)throws IOException{
+  private static void parseLine(String line, Listener l, boolean isEscaped) 
+  throws IOException{
     // extract the record type 
     int idx = line.indexOf(' '); 
     String recType = line.substring(0, idx);
@@ -208,8 +299,10 @@
       String tuple = matcher.group(0);
       String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
       String value = parts[1].substring(1, parts[1].length() -1);
-      value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR,
-                                         charsToEscape);
+      if (isEscaped) {
+        value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR,
+                                           charsToEscape);
+      }
       parseBuffer.put(Keys.valueOf(parts[0]), value);
     }
 
@@ -228,10 +321,9 @@
   
   static void log(PrintWriter out, RecordTypes recordType, Keys key, 
                   String value){
-    value = StringUtils.escapeString(value, StringUtils.ESCAPE_CHAR, 
-                                     charsToEscape);
+    value = escapeString(value);
     out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\""
-                + DELIMITER + LINE_DELIMITER); 
+                + DELIMITER + LINE_DELIMITER_CHAR); 
   }
   
   /**
@@ -249,13 +341,12 @@
     for(int i =0; i< keys.length; i++){
       buf.append(keys[i]);
       buf.append("=\"");
-      values[i] = StringUtils.escapeString(values[i],
-                                StringUtils.ESCAPE_CHAR, charsToEscape);
+      values[i] = escapeString(values[i]);
       buf.append(values[i]);
       buf.append("\"");
       buf.append(DELIMITER); 
     }
-    buf.append(LINE_DELIMITER);
+    buf.append(LINE_DELIMITER_CHAR);
     
     for (PrintWriter out : writers) {
       out.println(buf.toString());
@@ -747,6 +838,9 @@
           }
 
           openJobs.put(jobUniqueString, writers);
+          
+          // Log the history meta info
+          JobHistory.MetaInfoManager.logMetaInfo(writers);
 
           //add to writer as well 
           JobHistory.log(writers, RecordTypes.Job, 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=700028&r1=700027&r2=700028&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java Mon Sep 29 01:47:21 2008
@@ -73,6 +73,10 @@
                     "\t\b\n\f\"\n in it";
     String value4 = "Value ends with escape\\";
     String value5 = "Value ends with \\\" \\.\n";
+    
+    // Log the history version
+    JobHistory.MetaInfoManager.logMetaInfo(historyWriter);
+    
     JobHistory.log(historyWriter, RecordTypes.Job, 
                    new JobHistory.Keys[] {Keys.JOBTRACKERID, 
                                           Keys.TRACKER_NAME,