You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/29 10:47:22 UTC
svn commit: r700028 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JobHistory.java
src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
Author: ddas
Date: Mon Sep 29 01:47:21 2008
New Revision: 700028
URL: http://svn.apache.org/viewvc?rev=700028&view=rev
Log:
HADOOP-4190. Fixes the backward compatibility issue with Job History introduced by HADOOP-3245 and HADOOP-2403. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=700028&r1=700027&r2=700028&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep 29 01:47:21 2008
@@ -779,6 +779,9 @@
HADOOP-4189. Fixes the history blocksize & intertracker protocol version
issues introduced as part of HADOOP-3245. (Amar Kamat via ddas)
+ HADOOP-4190. Fixes the backward compatibility issue with Job History.
+ introduced by HADOOP-3245 and HADOOP-2403. (Amar Kamat via ddas)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=700028&r1=700027&r2=700028&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Mon Sep 29 01:47:21 2008
@@ -62,12 +62,21 @@
* For parsing the job history it supports a listener based interface where each line is parsed
* and passed to listener. The listener can create an object model of history or look for specific
* events and discard rest of the history.
+ *
+ * CHANGE LOG :
+ * Version 0 : The history has the following format :
+ * TAG KEY1="VALUE1" KEY2="VALUE2" and so on.
+ TAG can be Job, Task, MapAttempt or ReduceAttempt.
+ Note that a '"' is the line delimiter.
+ * Version 1 : Changes the line delimiter to '.'
+ Values are now escaped for unambiguous parsing.
+ Added the Meta tag to store version info.
*/
public class JobHistory {
+ static final long VERSION = 1L;
public static final Log LOG = LogFactory.getLog(JobHistory.class);
private static final String DELIMITER = " ";
- private static final String LINE_DELIMITER = ".";
private static final char LINE_DELIMITER_CHAR = '.';
private static final char[] charsToEscape = new char[] {'"', '=',
LINE_DELIMITER_CHAR};
@@ -91,7 +100,7 @@
* A record type appears as the first token in a single line of log.
*/
public static enum RecordTypes {
- Jobtracker, Job, Task, MapAttempt, ReduceAttempt
+ Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta
}
/**
@@ -105,7 +114,7 @@
FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE,
ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE,
SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT,
- TRACKER_NAME, STATE_STRING
+ TRACKER_NAME, STATE_STRING, VERSION
}
/**
@@ -159,6 +168,70 @@
}
/**
+ * Manages job-history's meta information such as version etc.
+ * Helps in logging version information to the job-history and recover
+ * version information from the history.
+ */
+ static class MetaInfoManager implements Listener {
+ private long version = 0L;
+ private KeyValuePair pairs = new KeyValuePair();
+
+ // Extract the version of the history that was used to write the history
+ public MetaInfoManager(String line) throws IOException {
+ if (null != line) {
+ // Parse the line
+ parseLine(line, this, false);
+ }
+ }
+
+ // Get the line delimiter
+ char getLineDelim() {
+ if (version == 0) {
+ return '"';
+ } else {
+ return LINE_DELIMITER_CHAR;
+ }
+ }
+
+ // Checks if the values are escaped or not
+ boolean isValueEscaped() {
+ // Note that the values are not escaped in version 0
+ return version != 0;
+ }
+
+ public void handle(RecordTypes recType, Map<Keys, String> values)
+ throws IOException {
+ // Check if the record is of type META
+ if (RecordTypes.Meta == recType) {
+ pairs.handle(values);
+ version = pairs.getLong(Keys.VERSION); // defaults to 0
+ }
+ }
+
+ /**
+ * Logs history meta-info to the history file. This needs to be called once
+ * per history file.
+ * @param jobId job id, assigned by jobtracker.
+ */
+ static void logMetaInfo(ArrayList<PrintWriter> writers){
+ if (!disableHistory){
+ if (null != writers){
+ JobHistory.log(writers, RecordTypes.Meta,
+ new Keys[] {Keys.VERSION},
+ new String[] {String.valueOf(VERSION)});
+ }
+ }
+ }
+ }
+
+ /** Escapes the string especially for {@link JobHistory}
+ */
+ static String escapeString(String data) {
+ return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ }
+
+ /**
* Parses history file and invokes Listener.handle() for
* each line of history. It can be used for looking through history
* files for specific items without having to keep whole history in memory.
@@ -174,17 +247,34 @@
try {
String line = null;
StringBuffer buf = new StringBuffer();
- while ((line = reader.readLine())!= null){
+
+ // Read the meta-info line. Note that this might a jobinfo line for files
+ // written with older format
+ line = reader.readLine();
+
+ // Check if the file is empty
+ if (line == null) {
+ return;
+ }
+
+ // Get the information required for further processing
+ MetaInfoManager mgr = new MetaInfoManager(line);
+ boolean isEscaped = mgr.isValueEscaped();
+ String lineDelim = String.valueOf(mgr.getLineDelim());
+ String escapedLineDelim =
+ StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR,
+ mgr.getLineDelim());
+
+ do {
buf.append(line);
- if (!line.trim().endsWith(LINE_DELIMITER) ||
- line.trim().endsWith(StringUtils.escapeString(LINE_DELIMITER,
- StringUtils.ESCAPE_CHAR, LINE_DELIMITER_CHAR))) {
+ if (!line.trim().endsWith(lineDelim)
+ || line.trim().endsWith(escapedLineDelim)) {
buf.append("\n");
continue;
}
- parseLine(buf.toString(), l);
+ parseLine(buf.toString(), l, isEscaped);
buf = new StringBuffer();
- }
+ } while ((line = reader.readLine())!= null);
} finally {
try { reader.close(); } catch (IOException ex) {}
}
@@ -196,7 +286,8 @@
* @param l
* @throws IOException
*/
- private static void parseLine(String line, Listener l)throws IOException{
+ private static void parseLine(String line, Listener l, boolean isEscaped)
+ throws IOException{
// extract the record type
int idx = line.indexOf(' ');
String recType = line.substring(0, idx);
@@ -208,8 +299,10 @@
String tuple = matcher.group(0);
String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
String value = parts[1].substring(1, parts[1].length() -1);
- value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR,
- charsToEscape);
+ if (isEscaped) {
+ value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR,
+ charsToEscape);
+ }
parseBuffer.put(Keys.valueOf(parts[0]), value);
}
@@ -228,10 +321,9 @@
static void log(PrintWriter out, RecordTypes recordType, Keys key,
String value){
- value = StringUtils.escapeString(value, StringUtils.ESCAPE_CHAR,
- charsToEscape);
+ value = escapeString(value);
out.println(recordType.name() + DELIMITER + key + "=\"" + value + "\""
- + DELIMITER + LINE_DELIMITER);
+ + DELIMITER + LINE_DELIMITER_CHAR);
}
/**
@@ -249,13 +341,12 @@
for(int i =0; i< keys.length; i++){
buf.append(keys[i]);
buf.append("=\"");
- values[i] = StringUtils.escapeString(values[i],
- StringUtils.ESCAPE_CHAR, charsToEscape);
+ values[i] = escapeString(values[i]);
buf.append(values[i]);
buf.append("\"");
buf.append(DELIMITER);
}
- buf.append(LINE_DELIMITER);
+ buf.append(LINE_DELIMITER_CHAR);
for (PrintWriter out : writers) {
out.println(buf.toString());
@@ -747,6 +838,9 @@
}
openJobs.put(jobUniqueString, writers);
+
+ // Log the history meta info
+ JobHistory.MetaInfoManager.logMetaInfo(writers);
//add to writer as well
JobHistory.log(writers, RecordTypes.Job,
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=700028&r1=700027&r2=700028&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java Mon Sep 29 01:47:21 2008
@@ -73,6 +73,10 @@
"\t\b\n\f\"\n in it";
String value4 = "Value ends with escape\\";
String value5 = "Value ends with \\\" \\.\n";
+
+ // Log the history version
+ JobHistory.MetaInfoManager.logMetaInfo(historyWriter);
+
JobHistory.log(historyWriter, RecordTypes.Job,
new JobHistory.Keys[] {Keys.JOBTRACKERID,
Keys.TRACKER_NAME,