You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:01 UTC

svn commit: r1077515 [3/4] - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/tools/rumen/ test/tools/data/rumen/small-trace-test/ test/tools/data/rumen/small-trace-test/counters-fo...

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Fri Mar  4 04:22:59 2011
@@ -47,9 +47,6 @@ public class LoggedTask implements DeepC
   List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
   List<LoggedLocation> preferredLocations = Collections.emptyList();
 
-  int numberMaps = -1;
-  int numberReduces = -1;
-
   static private Set<String> alreadySeenAnySetterAttributes =
       new TreeSet<String>();
 
@@ -68,6 +65,15 @@ public class LoggedTask implements DeepC
     super();
   }
 
+  void adjustTimes(long adjustment) {
+    startTime += adjustment;
+    finishTime += adjustment;
+
+    for (LoggedTaskAttempt attempt : attempts) {
+      attempt.adjustTimes(adjustment);
+    }
+  }
+
   public long getInputBytes() {
     return inputBytes;
   }
@@ -148,22 +154,6 @@ public class LoggedTask implements DeepC
     }
   }
 
-  public int getNumberMaps() {
-    return numberMaps;
-  }
-
-  void setNumberMaps(int numberMaps) {
-    this.numberMaps = numberMaps;
-  }
-
-  public int getNumberReduces() {
-    return numberReduces;
-  }
-
-  void setNumberReduces(int numberReduces) {
-    this.numberReduces = numberReduces;
-  }
-
   public Pre21JobHistoryConstants.Values getTaskStatus() {
     return taskStatus;
   }
@@ -180,6 +170,110 @@ public class LoggedTask implements DeepC
     this.taskType = taskType;
   }
 
+  private void incorporateMapCounters(JhCounters counters) {
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.inputBytes = val;
+      }
+    }, counters, "HDFS_BYTES_READ");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.outputBytes = val;
+      }
+    }, counters, "FILE_BYTES_WRITTEN");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.inputRecords = val;
+      }
+    }, counters, "MAP_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.outputRecords = val;
+      }
+    }, counters, "MAP_OUTPUT_RECORDS");
+  }
+
+  private void incorporateReduceCounters(JhCounters counters) {
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.inputBytes = val;
+      }
+    }, counters, "REDUCE_SHUFFLE_BYTES");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.outputBytes = val;
+      }
+    }, counters, "HDFS_BYTES_WRITTEN");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.inputRecords = val;
+      }
+    }, counters, "REDUCE_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        task.outputRecords = val;
+      }
+    }, counters, "REDUCE_OUTPUT_RECORDS");
+  }
+
+  // incorporate event counters
+  // LoggedTask MUST KNOW ITS TYPE BEFORE THIS CALL
+  public void incorporateCounters(JhCounters counters) {
+    switch (taskType) {
+    case MAP:
+      incorporateMapCounters(counters);
+      return;
+    case REDUCE:
+      incorporateReduceCounters(counters);
+      return;
+      // NOT exhaustive
+    }
+  }
+
+  private static String canonicalizeCounterName(String nonCanonicalName) {
+    String result = nonCanonicalName.toLowerCase();
+
+    result = result.replace(' ', '|');
+    result = result.replace('-', '|');
+    result = result.replace('_', '|');
+    result = result.replace('.', '|');
+
+    return result;
+  }
+
+  private abstract class SetField {
+    LoggedTask task;
+
+    SetField(LoggedTask task) {
+      this.task = task;
+    }
+
+    abstract void set(long value);
+  }
+
+  private static void incorporateCounter(SetField thunk, JhCounters counters,
+      String counterName) {
+    counterName = canonicalizeCounterName(counterName);
+
+    for (JhCounterGroup group : counters.groups) {
+      for (JhCounter counter : group.counts) {
+        if (counterName
+            .equals(canonicalizeCounterName(counter.name.toString()))) {
+          thunk.set(counter.value);
+          return;
+        }
+      }
+    }
+  }
+
   private void compare1(long c1, long c2, TreePath loc, String eltname)
       throws DeepInequalityException {
     if (c1 != c2) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Fri Mar  4 04:22:59 2011
@@ -82,6 +82,11 @@ public class LoggedTaskAttempt implement
     }
   }
 
+  void adjustTimes(long adjustment) {
+    startTime += adjustment;
+    finishTime += adjustment;
+  }
+
   public long getShuffleFinished() {
     return shuffleFinished;
   }
@@ -135,7 +140,7 @@ public class LoggedTaskAttempt implement
   }
 
   void setHostName(String hostName) {
-    this.hostName = (hostName == null) ? null : hostName.intern();
+    this.hostName = hostName == null ? null : hostName.intern();
   }
 
   public long getHdfsBytesRead() {
@@ -258,6 +263,130 @@ public class LoggedTaskAttempt implement
     this.mapInputBytes = mapInputBytes;
   }
 
+  // incorporate event counters
+  public void incorporateCounters(JhCounters counters) {
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.hdfsBytesRead = val;
+      }
+    }, counters, "HDFS_BYTES_READ");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.hdfsBytesWritten = val;
+      }
+    }, counters, "HDFS_BYTES_WRITTEN");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.fileBytesRead = val;
+      }
+    }, counters, "FILE_BYTES_READ");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.fileBytesWritten = val;
+      }
+    }, counters, "FILE_BYTES_WRITTEN");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.mapInputBytes = val;
+      }
+    }, counters, "MAP_INPUT_BYTES");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.mapInputRecords = val;
+      }
+    }, counters, "MAP_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.mapOutputBytes = val;
+      }
+    }, counters, "MAP_OUTPUT_BYTES");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.mapOutputRecords = val;
+      }
+    }, counters, "MAP_OUTPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.combineInputRecords = val;
+      }
+    }, counters, "COMBINE_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.reduceInputGroups = val;
+      }
+    }, counters, "REDUCE_INPUT_GROUPS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.reduceInputRecords = val;
+      }
+    }, counters, "REDUCE_INPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.reduceShuffleBytes = val;
+      }
+    }, counters, "REDUCE_SHUFFLE_BYTES");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.reduceOutputRecords = val;
+      }
+    }, counters, "REDUCE_OUTPUT_RECORDS");
+    incorporateCounter(new SetField(this) {
+      @Override
+      void set(long val) {
+        attempt.spilledRecords = val;
+      }
+    }, counters, "SPILLED_RECORDS");
+  }
+
+  private static String canonicalizeCounterName(String nonCanonicalName) {
+    String result = nonCanonicalName.toLowerCase();
+
+    result = result.replace(' ', '|');
+    result = result.replace('-', '|');
+    result = result.replace('_', '|');
+    result = result.replace('.', '|');
+
+    return result;
+  }
+
+  private abstract class SetField {
+    LoggedTaskAttempt attempt;
+
+    SetField(LoggedTaskAttempt attempt) {
+      this.attempt = attempt;
+    }
+
+    abstract void set(long value);
+  }
+
+  private static void incorporateCounter(SetField thunk, JhCounters counters,
+      String counterName) {
+    counterName = canonicalizeCounterName(counterName);
+
+    for (JhCounterGroup group : counters.groups) {
+      for (JhCounter counter : group.counts) {
+        if (counterName
+            .equals(canonicalizeCounterName(counter.name.toString()))) {
+          thunk.set(counter.value);
+          return;
+        }
+      }
+    }
+  }
+
   private void compare1(String c1, String c2, TreePath loc, String eltname)
       throws DeepInequalityException {
     if (c1 == null && c2 == null) {

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class MapAttempt20LineHistoryEventEmitter extends
+    TaskAttempt20LineEventEmitter {
+
+  static List<SingleEventEmitter> nonFinals =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+  static {
+    nonFinals.addAll(taskEventNonFinalSEEs);
+
+    finals.add(new MapAttemptFinishedEventEmitter());
+  }
+
+  protected MapAttempt20LineHistoryEventEmitter() {
+    super();
+  }
+
+  static private class MapAttemptFinishedEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+      String status = line.get("TASK_STATUS");
+
+      if (finishTime != null && status != null
+          && status.equalsIgnoreCase("success")) {
+        String hostName = line.get("HOSTNAME");
+        String counters = line.get("COUNTERS");
+        String state = line.get("STATE_STRING");
+
+        MapAttempt20LineHistoryEventEmitter that =
+            (MapAttempt20LineHistoryEventEmitter) thatg;
+
+        if (finishTime != null && "success".equalsIgnoreCase(status)) {
+          return new MapAttemptFinishedEvent(taskAttemptID,
+              that.originalTaskType, status, Long.parseLong(finishTime), Long
+                  .parseLong(finishTime), hostName, state,
+              maybeParseCounters(counters));
+        }
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  List<SingleEventEmitter> finalSEEs() {
+    return finals;
+  }
+
+  @Override
+  List<SingleEventEmitter> nonFinalSEEs() {
+    return nonFinals;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/MapAttemptFinishedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record successful completion of a map attempt
+ *
+ */
+public class MapAttemptFinishedEvent  implements HistoryEvent {
+  private TaskID taskId;
+  private TaskAttemptID attemptId;
+  private TaskType taskType;
+  private String taskStatus;
+  private long mapFinishTime;
+  private long finishTime;
+  private String hostname;
+  private String state;
+  private JhCounters counters;
+  
+  /** 
+   * Create an event for successful completion of map attempts
+   * @param id Task Attempt ID
+   * @param taskType Type of the task
+   * @param taskStatus Status of the task
+   * @param mapFinishTime Finish time of the map phase
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the map executed
+   * @param state State string for the attempt
+   * @param counters Counters for the attempt
+   */
+  public MapAttemptFinishedEvent(TaskAttemptID id, 
+      TaskType taskType, String taskStatus, 
+      long mapFinishTime, long finishTime,
+      String hostname, String state, Counters counters) {
+    this.taskId = id.getTaskID();
+    this.attemptId = id;
+    this.taskType = taskType;
+    this.taskStatus = taskStatus;
+    this.mapFinishTime = mapFinishTime;
+    this.finishTime = finishTime;
+    this.hostname = hostname;
+    this.state = state;
+    this.counters = new JhCounters(counters, "COUNTERS");
+  }
+
+  /** Get the task ID */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the attempt id */
+  public TaskAttemptID getAttemptId() {
+    return attemptId;
+  }
+  /** Get the task type */
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  /** Get the task status */
+  public String getTaskStatus() { return taskStatus; }
+  /** Get the map phase finish time */
+  public long getMapFinishTime() { return mapFinishTime; }
+  /** Get the attempt finish time */
+  public long getFinishTime() { return finishTime; }
+  /** Get the host name */
+  public String getHostname() { return hostname; }
+  /** Get the state string */
+  public String getState() { return state; }
+  /** Get the counters */
+  public JhCounters getCounters() { return counters; }
+  /** Get the event type */
+   public EventType getEventType() {
+    return EventType.MAP_ATTEMPT_FINISHED;
+  }
+  
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Outputter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Interface to output a sequence of objects of type T.
+ */
+public interface Outputter<T> extends Closeable {
+  /**
+   * Initialize the {@link Outputter} to a specific path.
+   * @param path The {@link Path} to the output file.
+   * @param conf Configuration
+   * @throws IOException
+   */
+  public void init(Path path, Configuration conf) throws IOException;
+  
+  /**
+   * Output an object.
+   * @param object The objecte.
+   * @throws IOException
+   */
+  public void output(T object) throws IOException;
+
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Fri Mar  4 04:22:59 2011
@@ -37,8 +37,10 @@ import org.w3c.dom.Text;
 import org.xml.sax.SAXException;
 
 class ParsedConfigFile {
-  private static final Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_");
-  private static final Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])");
+  private static final Pattern jobIDPattern =
+      Pattern.compile("_(job_[0-9]+_[0-9]+)_");
+  private static final Pattern heapPattern =
+      Pattern.compile("-Xmx([0-9]+)([mMgG])");
 
   final int heapMegabytes;
 
@@ -68,6 +70,7 @@ class ParsedConfigFile {
   }
 
   @SuppressWarnings("hiding")
+  @Deprecated
   ParsedConfigFile(String filenameLine, String xmlString) {
     super();
 
@@ -136,10 +139,11 @@ class ParsedConfigFile {
             value = ((Text) field.getFirstChild()).getData();
           }
           if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
-            finalParameter = "true".equals(((Text) field.getFirstChild())
-                .getData());
+            finalParameter =
+                "true".equals(((Text) field.getFirstChild()).getData());
           }
         }
+
         if ("mapred.child.java.opts".equals(attr) && value != null) {
           Matcher matcher = heapPattern.matcher(value);
           if (matcher.find()) {
@@ -153,7 +157,7 @@ class ParsedConfigFile {
           }
         }
 
-        if ("mapred.job.queue.name".equals(attr) && value != null) {
+        if ("mapred.queue.name".equals(attr) && value != null) {
           queue = value;
         }
 
@@ -161,14 +165,15 @@ class ParsedConfigFile {
           jobName = value;
         }
 
-        clusterMapMB = maybeGetIntValue("mapred.cluster.map.memory.mb", attr,
-            value, clusterMapMB);
-        clusterReduceMB = maybeGetIntValue("mapred.cluster.reduce.memory.mb",
-            attr, value, clusterReduceMB);
-        jobMapMB = maybeGetIntValue("mapred.job.map.memory.mb", attr, value,
-            jobMapMB);
-        jobReduceMB = maybeGetIntValue("mapred.job.reduce.memory.mb", attr,
-            value, jobReduceMB);
+        clusterMapMB =
+            maybeGetIntValue("mapreduce.cluster.mapmemory.mb", attr, value, clusterMapMB);
+        clusterReduceMB =
+            maybeGetIntValue("mapred.cluster.reduce.memory.mb", attr, value,
+                clusterReduceMB);
+        jobMapMB =
+            maybeGetIntValue("mapred.job.map.memory.mb", attr, value, jobMapMB);
+        jobReduceMB =
+            maybeGetIntValue("mapred.job.reduce.memory.mb", attr, value, jobReduceMB);
       }
 
       valid = true;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Fri Mar  4 04:22:59 2011
@@ -24,10 +24,10 @@ class ParsedLine {
   Properties content;
   LogRecordType type;
 
-  static final Pattern keyValPair = Pattern
-      .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
+  static final Pattern keyValPair =
+      Pattern.compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\"");
 
-  @SuppressWarnings("unused") 
+  @SuppressWarnings("unused")
   ParsedLine(String fullLine, int version) {
     super();
 
@@ -47,8 +47,16 @@ class ParsedLine {
 
     String propValPairs = fullLine.substring(firstSpace + 1);
 
-    while (propValPairs.length() > 0 && propValPairs.charAt(0) == ' ') {
-      propValPairs = propValPairs.substring(1);
+    int pvPairsFirstNonBlank = 0;
+    int pvPairsLength = propValPairs.length();
+
+    while (pvPairsLength > pvPairsFirstNonBlank
+        && propValPairs.charAt(pvPairsFirstNonBlank) == ' ') {
+      ++pvPairsFirstNonBlank;
+    }
+
+    if (pvPairsFirstNonBlank != 0) {
+      propValPairs = propValPairs.substring(pvPairsFirstNonBlank);
     }
 
     int cursor = 0;

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.Decompressor;
+
+class PossiblyDecompressedInputStream extends InputStream {
+  private final Decompressor decompressor;
+  private final InputStream coreInputStream;
+
+  public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
+      throws IOException {
+    CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
+    CompressionCodec inputCodec = codecs.getCodec(inputPath);
+
+    FileSystem ifs = inputPath.getFileSystem(conf);
+    FSDataInputStream fileIn = ifs.open(inputPath);
+
+    if (inputCodec == null) {
+      decompressor = null;
+      coreInputStream = fileIn;
+    } else {
+      decompressor = CodecPool.getDecompressor(inputCodec);
+      coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
+    }
+  }
+
+  @Override
+  public int read() throws IOException {
+    return coreInputStream.read();
+  }
+
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    return coreInputStream.read(buffer, offset, length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (decompressor != null) {
+      CodecPool.returnDecompressor(decompressor);
+    }
+
+    coreInputStream.close();
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RandomSeedGenerator.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The purpose of this class is to generate new random seeds from a master
+ * seed. This is needed to make the Random().next*() calls in rumen and mumak
+ * deterministic so that mumak simulations become deterministically replayable.
+ *
+ * In these tools we need many independent streams of random numbers, some of
+ * which are created dynamically. We seed these streams with the sub-seeds 
+ * returned by RandomSeedGenerator.
+ * 
+ * For a slightly more complicated approach to generating multiple streams of 
+ * random numbers with better theoretical guarantees, see
+ * P. L'Ecuyer, R. Simard, E. J. Chen, and W. D. Kelton, 
+ * ``An Objected-Oriented Random-Number Package with Many Long Streams and 
+ * Substreams'', Operations Research, 50, 6 (2002), 1073--1075
+ * http://www.iro.umontreal.ca/~lecuyer/papers.html
+ * http://www.iro.umontreal.ca/~lecuyer/myftp/streams00/
+ */
+public class RandomSeedGenerator {
+  private static Log LOG = LogFactory.getLog(RandomSeedGenerator.class);
+  
+  /** MD5 algorithm instance, one for each thread. */
+  private static final ThreadLocal<MessageDigest> md5Holder =
+      new ThreadLocal<MessageDigest>() {
+        @Override protected MessageDigest initialValue() {
+          MessageDigest md5 = null; 
+          try {
+            md5 = MessageDigest.getInstance("MD5");
+          } catch (NoSuchAlgorithmException nsae) {
+            throw new RuntimeException("Can't create MD5 digests", nsae);
+          }
+          return md5;
+        }
+      };
+      
+  /**
+   * Generates a new random seed.
+   *
+   * @param streamId a string identifying the stream of random numbers
+   * @param masterSeed higher level master random seed
+   * @return the random seed. Different (streamId, masterSeed) pairs result in
+   *         (vastly) different random seeds.
+   */   
+  public static long getSeed(String streamId, long masterSeed) {
+    MessageDigest md5 = md5Holder.get();
+    md5.reset();
+    //'/' : make sure that we don't get the same str from ('11',0) and ('1',10)
+    // We could have fed the bytes of masterSeed one by one to md5.update()
+    // instead
+    String str = streamId + '/' + masterSeed;
+    byte[] digest = md5.digest(str.getBytes());
+    // Create a long from the first 8 bytes of the digest
+    // This is fine as MD5 has the avalanche property.
+    // Paranoids could have XOR folded the other 8 bytes in too. 
+    long seed = 0;
+    for (int i=0; i<8; i++) {
+      seed = (seed<<8) + ((int)digest[i]+128);
+    }
+    return seed;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class ReduceAttempt20LineHistoryEventEmitter extends
+    TaskAttempt20LineEventEmitter {
+
+  static List<SingleEventEmitter> nonFinals =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+  static {
+    nonFinals.addAll(taskEventNonFinalSEEs);
+
+    finals.add(new ReduceAttemptFinishedEventEmitter());
+  }
+
+  ReduceAttempt20LineHistoryEventEmitter() {
+    super();
+  }
+
+  static private class ReduceAttemptFinishedEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+      String status = line.get("TASK_STATUS");
+
+      if (finishTime != null && status != null
+          && status.equalsIgnoreCase("success")) {
+        String hostName = line.get("HOSTNAME");
+        String counters = line.get("COUNTERS");
+        String state = line.get("STATE_STRING");
+        String shuffleFinish = line.get("SHUFFLE_FINISHED");
+        String sortFinish = line.get("SORT_FINISHED");
+
+        if (finishTime != null && shuffleFinish != null && sortFinish != null
+            && "success".equalsIgnoreCase(status)) {
+          ReduceAttempt20LineHistoryEventEmitter that =
+              (ReduceAttempt20LineHistoryEventEmitter) thatg;
+
+          return new ReduceAttemptFinishedEvent(taskAttemptID,
+              that.originalTaskType, status, Long.parseLong(shuffleFinish),
+              Long.parseLong(sortFinish), Long.parseLong(finishTime), hostName,
+              state, maybeParseCounters(counters));
+        }
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  List<SingleEventEmitter> finalSEEs() {
+    return finals;
+  }
+
+  @Override
+  List<SingleEventEmitter> nonFinalSEEs() {
+    return nonFinals;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/ReduceAttemptFinishedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record successful completion of a reduce attempt
+ *
+ */
+public class ReduceAttemptFinishedEvent  implements HistoryEvent {
+  private TaskID taskId;
+  private TaskAttemptID attemptId;
+  private TaskType taskType;
+  private String taskStatus;
+  private long shuffleFinishTime;
+  private long sortFinishTime;
+  private long finishTime;
+  private String hostname;
+  private String state;
+  private JhCounters counters;
+
+  /**
+   * Create an event to record completion of a reduce attempt
+   * @param id Attempt Id
+   * @param taskType Type of task
+   * @param taskStatus Status of the task
+   * @param shuffleFinishTime Finish time of the shuffle phase
+   * @param sortFinishTime Finish time of the sort phase
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the attempt executed
+   * @param state State of the attempt
+   * @param counters Counters for the attempt
+   */
+  public ReduceAttemptFinishedEvent(TaskAttemptID id, 
+      TaskType taskType, String taskStatus, 
+      long shuffleFinishTime, long sortFinishTime, 
+      long finishTime,
+      String hostname, String state, Counters counters) {
+    this.taskId = id.getTaskID();
+    this.attemptId = id;
+    this.taskType = taskType;
+    this.taskStatus = taskStatus;
+    this.shuffleFinishTime = shuffleFinishTime;
+    this.sortFinishTime = sortFinishTime;
+    this.finishTime = finishTime;
+    this.hostname = hostname;
+    this.state = state;
+    this.counters = new JhCounters(counters, "COUNTERS");
+  }
+
+  /** Get the Task ID */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the attempt id */
+  public TaskAttemptID getAttemptId() {
+    return attemptId;
+  }
+  /** Get the task type */
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  /** Get the task status */
+  public String getTaskStatus() { return taskStatus; }
+  /** Get the finish time of the sort phase */
+  public long getSortFinishTime() { return sortFinishTime; }
+  /** Get the finish time of the shuffle phase */
+  public long getShuffleFinishTime() { return shuffleFinishTime; }
+  /** Get the finish time of the attempt */
+  public long getFinishTime() { return finishTime; }
+  /** Get the name of the host where the attempt ran */
+  public String getHostname() { return hostname; }
+  /** Get the state string */
+  public String getState() { return state; }
+  /** Get the counters for the attempt */
+  public JhCounters getCounters() { return counters; }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.REDUCE_ATTEMPT_FINISHED;
+  }
+
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A simple wrapper class to make any input stream "rewindable". It could be
+ * made more memory efficient by grow the internal buffer adaptively.
+ */
+public class RewindableInputStream extends InputStream {
+  private InputStream input;
+
+  /**
+   * Constructor.
+   * 
+   * @param input
+   */
+  public RewindableInputStream(InputStream input) {
+    this(input, 1024 * 1024);
+  }
+
+  /**
+   * Constructor
+   * 
+   * @param input
+   *          input stream.
+   * @param maxBytesToRemember
+   *          Maximum number of bytes we need to remember at the beginning of
+   *          the stream. If {@link #rewind()} is called after so many bytes are
+   *          read from the stream, {@link #rewind()} would fail.
+   */
+  public RewindableInputStream(InputStream input, int maxBytesToRemember) {
+    this.input = new BufferedInputStream(input, maxBytesToRemember);
+    this.input.mark(maxBytesToRemember);
+  }
+
+  @Override
+  public int read() throws IOException {
+    return input.read();
+  }
+
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    return input.read(buffer, offset, length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    input.close();
+  }
+
+  public InputStream rewind() throws IOException {
+    try {
+      input.reset();
+      return this;
+    } catch (IOException e) {
+      throw new IOException("Unable to rewind the stream", e);
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+abstract class SingleEventEmitter {
+  abstract HistoryEvent maybeEmitEvent(ParsedLine line, String name,
+      HistoryEventEmitter that);
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class Task20LineHistoryEventEmitter extends HistoryEventEmitter {
+
+  static List<SingleEventEmitter> nonFinals =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+  Long originalStartTime = null;
+  TaskType originalTaskType = null;
+
+  static {
+    nonFinals.add(new TaskStartedEventEmitter());
+    nonFinals.add(new TaskUpdatedEventEmitter());
+
+    finals.add(new TaskFinishedEventEmitter());
+    finals.add(new TaskFailedEventEmitter());
+  }
+
+  protected Task20LineHistoryEventEmitter() {
+    super();
+  }
+
+  static private class TaskStartedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+        HistoryEventEmitter thatg) {
+      if (taskIDName == null) {
+        return null;
+      }
+
+      TaskID taskID = TaskID.forName(taskIDName);
+
+      String taskType = line.get("TASK_TYPE");
+      String startTime = line.get("START_TIME");
+      String splits = line.get("SPLITS");
+
+      if (startTime != null && taskType != null) {
+        Task20LineHistoryEventEmitter that =
+            (Task20LineHistoryEventEmitter) thatg;
+
+        that.originalStartTime = Long.parseLong(startTime);
+        that.originalTaskType =
+            Version20LogInterfaceUtils.get20TaskType(taskType);
+
+        return new TaskStartedEvent(taskID, that.originalStartTime,
+            that.originalTaskType, splits);
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskUpdatedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+        HistoryEventEmitter thatg) {
+      if (taskIDName == null) {
+        return null;
+      }
+
+      TaskID taskID = TaskID.forName(taskIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+
+      if (finishTime != null) {
+        return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime));
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskFinishedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+        HistoryEventEmitter thatg) {
+      if (taskIDName == null) {
+        return null;
+      }
+
+      TaskID taskID = TaskID.forName(taskIDName);
+
+      String status = line.get("TASK_STATUS");
+      String finishTime = line.get("FINISH_TIME");
+
+      String error = line.get("ERROR");
+
+      String counters = line.get("COUNTERS");
+
+      if (finishTime != null && error == null
+          && (status != null && status.equalsIgnoreCase("success"))) {
+        Counters eventCounters = maybeParseCounters(counters);
+
+        Task20LineHistoryEventEmitter that =
+            (Task20LineHistoryEventEmitter) thatg;
+
+        if (that.originalTaskType == null) {
+          return null;
+        }
+
+        return new TaskFinishedEvent(taskID, Long.parseLong(finishTime),
+            that.originalTaskType, status, eventCounters);
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskFailedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName,
+        HistoryEventEmitter thatg) {
+      if (taskIDName == null) {
+        return null;
+      }
+
+      TaskID taskID = TaskID.forName(taskIDName);
+
+      String status = line.get("TASK_STATUS");
+      String finishTime = line.get("FINISH_TIME");
+
+      String taskType = line.get("TASK_TYPE");
+
+      String error = line.get("ERROR");
+
+      if (finishTime != null
+          && (error != null || (status != null && !status
+              .equalsIgnoreCase("success")))) {
+        Task20LineHistoryEventEmitter that =
+            (Task20LineHistoryEventEmitter) thatg;
+
+        TaskType originalTaskType =
+            that.originalTaskType == null ? Version20LogInterfaceUtils
+                .get20TaskType(taskType) : that.originalTaskType;
+
+        return new TaskFailedEvent(taskID, Long.parseLong(finishTime),
+            originalTaskType, error, status, null);
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  List<SingleEventEmitter> finalSEEs() {
+    return finals;
+  }
+
+  @Override
+  List<SingleEventEmitter> nonFinalSEEs() {
+    return nonFinals;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter {
+  static List<SingleEventEmitter> taskEventNonFinalSEEs =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> taskEventFinalSEEs =
+      new LinkedList<SingleEventEmitter>();
+
+  static private final int DEFAULT_HTTP_PORT = 80;
+
+  Long originalStartTime = null;
+  org.apache.hadoop.mapreduce.TaskType originalTaskType = null;
+
+  static {
+    taskEventNonFinalSEEs.add(new TaskAttemptStartedEventEmitter());
+    taskEventNonFinalSEEs.add(new TaskAttemptFinishedEventEmitter());
+    taskEventNonFinalSEEs
+        .add(new TaskAttemptUnsuccessfulCompletionEventEmitter());
+  }
+
+  protected TaskAttempt20LineEventEmitter() {
+    super();
+  }
+
+  static private class TaskAttemptStartedEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String startTime = line.get("START_TIME");
+      String taskType = line.get("TASK_TYPE");
+      String trackerName = line.get("TRACKER_NAME");
+      String httpPort = line.get("HTTP_PORT");
+
+      if (startTime != null && taskType != null) {
+        TaskAttempt20LineEventEmitter that =
+            (TaskAttempt20LineEventEmitter) thatg;
+
+        that.originalStartTime = Long.parseLong(startTime);
+        that.originalTaskType =
+            Version20LogInterfaceUtils.get20TaskType(taskType);
+
+        int port =
+            httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer
+                .parseInt(httpPort);
+
+        return new TaskAttemptStartedEvent(taskAttemptID,
+            that.originalTaskType, that.originalStartTime, trackerName, port);
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskAttemptFinishedEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+      String status = line.get("TASK_STATUS");
+
+      if (finishTime != null && status != null
+          && status.equalsIgnoreCase("success")) {
+        String hostName = line.get("HOSTNAME");
+        String counters = line.get("COUNTERS");
+        String state = line.get("STATE_STRING");
+
+        TaskAttempt20LineEventEmitter that =
+            (TaskAttempt20LineEventEmitter) thatg;
+
+        return new TaskAttemptFinishedEvent(taskAttemptID,
+            that.originalTaskType, status, Long.parseLong(finishTime),
+            hostName, state, maybeParseCounters(counters));
+      }
+
+      return null;
+    }
+  }
+
+  static private class TaskAttemptUnsuccessfulCompletionEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName,
+        HistoryEventEmitter thatg) {
+      if (taskAttemptIDName == null) {
+        return null;
+      }
+
+      TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+      String status = line.get("TASK_STATUS");
+
+      if (finishTime != null && status != null
+          && !status.equalsIgnoreCase("success")) {
+        String hostName = line.get("HOSTNAME");
+        String error = line.get("ERROR");
+
+        TaskAttempt20LineEventEmitter that =
+            (TaskAttempt20LineEventEmitter) thatg;
+
+        return new TaskAttemptUnsuccessfulCompletionEvent(taskAttemptID,
+            that.originalTaskType, status, Long.parseLong(finishTime),
+            hostName, error);
+      }
+
+      return null;
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptFinishedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record successful task completion
+ *
+ */
+public class TaskAttemptFinishedEvent  implements HistoryEvent {
+  private TaskID taskId;
+  private TaskAttemptID attemptId;
+  private TaskType taskType;
+  private String taskStatus;
+  private long finishTime;
+  private String hostname;
+  private String state;
+  private JhCounters counters;
+
+  /**
+   * Create an event to record successful finishes for setup and cleanup 
+   * attempts
+   * @param id Attempt ID
+   * @param taskType Type of task
+   * @param taskStatus Status of task
+   * @param finishTime Finish time of attempt
+   * @param hostname Host where the attempt executed
+   * @param state State string
+   * @param counters Counters for the attempt
+   */
+  public TaskAttemptFinishedEvent(TaskAttemptID id, 
+      TaskType taskType, String taskStatus, 
+      long finishTime,
+      String hostname, String state, Counters counters) {
+    this.taskId = id.getTaskID();
+    this.attemptId = id;
+    this.taskType = taskType;
+    this.taskStatus = taskStatus;
+    this.finishTime = finishTime;
+    this.hostname = hostname;
+    this.state = state;
+    this.counters = new JhCounters(counters, "COUNTERS");
+  }
+
+  /** Get the task ID */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the task attempt id */
+  public TaskAttemptID getAttemptId() {
+    return attemptId;
+  }
+  /** Get the task type */
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  /** Get the task status */
+  public String getTaskStatus() { return taskStatus; }
+  /** Get the attempt finish time */
+  public long getFinishTime() { return finishTime; }
+  /** Get the host where the attempt executed */
+  public String getHostname() { return hostname.toString(); }
+  /** Get the state string */
+  public String getState() { return state.toString(); }
+  /** Get the counters for the attempt */
+  public JhCounters getCounters() { return counters; }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.MAP_ATTEMPT_FINISHED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptStartedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record start of a task attempt
+ *
+ */
+public class TaskAttemptStartedEvent implements HistoryEvent {
+  private TaskID taskId;
+  private TaskAttemptID attemptId;
+  private long startTime;
+  private TaskType taskType;
+  private String trackerName;
+  private int httpPort;
+
+  /**
+   * Create an event to record the start of an attempt
+   * @param attemptId Id of the attempt
+   * @param taskType Type of task
+   * @param startTime Start time of the attempt
+   * @param trackerName Name of the Task Tracker where attempt is running
+   * @param httpPort The port number of the tracker
+   */
+  public TaskAttemptStartedEvent( TaskAttemptID attemptId,  
+      TaskType taskType, long startTime, String trackerName,
+      int httpPort) {
+    this.taskId = attemptId.getTaskID();
+    this.attemptId = attemptId;
+    this.startTime = startTime;
+    this.taskType = taskType;
+    this.trackerName = trackerName;
+    this.httpPort = httpPort;
+  }
+
+  /** Get the task id */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the tracker name */
+  public String getTrackerName() { return trackerName; }
+  /** Get the start time */
+  public long getStartTime() { return startTime; }
+  /** Get the task type */
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  /** Get the HTTP port */
+  public int getHttpPort() { return httpPort; }
+  /** Get the attempt id */
+  public TaskAttemptID getTaskAttemptId() {
+    return attemptId;
+  }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.MAP_ATTEMPT_STARTED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskAttemptUnsuccessfulCompletionEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record unsuccessful (Killed/Failed) completion of task attempts
+ *
+ */
+public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
+  private TaskID taskId;
+  private TaskType taskType;
+  private TaskAttemptID attemptId;
+  private long finishTime;
+  private String hostname;
+  private String error;
+  private String status;
+
+  /** 
+   * Create an event to record the unsuccessful completion of attempts
+   * @param id Attempt ID
+   * @param taskType Type of the task
+   * @param status Status of the attempt
+   * @param finishTime Finish time of the attempt
+   * @param hostname Name of the host where the attempt executed
+   * @param error Error string
+   */
+  public TaskAttemptUnsuccessfulCompletionEvent(TaskAttemptID id, 
+      TaskType taskType,
+      String status, long finishTime, 
+      String hostname, String error) {
+    this.taskId = id.getTaskID();
+    this.taskType = taskType;
+    this.attemptId = id;
+    this.finishTime = finishTime;
+    this.hostname = hostname;
+    this.error = error;
+    this.status = status;
+  }
+
+  /** Get the task id */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the task type */
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  /** Get the attempt id */
+  public TaskAttemptID getTaskAttemptId() {
+    return attemptId;
+  }
+  /** Get the finish time */
+  public long getFinishTime() { return finishTime; }
+  /** Get the name of the host where the attempt executed */
+  public String getHostname() { return hostname; }
+  /** Get the error string */
+  public String getError() { return error; }
+  /** Get the task status */
+  public String getTaskStatus() { return status; }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.MAP_ATTEMPT_KILLED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFailedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record the failure of a task
+ *
+ */
+public class TaskFailedEvent implements HistoryEvent {
+  private TaskID taskId;
+  private String error;
+  private long finishTime;
+  private TaskType taskType;
+  private TaskAttemptID failedDueToAttempt;
+  private String status;
+
+  /**
+   * Create an event to record task failure
+   * @param id Task ID
+   * @param finishTime Finish time of the task
+   * @param taskType Type of the task
+   * @param error Error String
+   * @param status Status
+   * @param failedDueToAttempt The attempt id due to which the task failed
+   */
+  public TaskFailedEvent(TaskID id, long finishTime, 
+      TaskType taskType, String error, String status,
+      TaskAttemptID failedDueToAttempt) {
+    this.taskId = id;
+    this.error = error;
+    this.finishTime = finishTime;
+    this.taskType = taskType;
+    this.failedDueToAttempt = failedDueToAttempt;
+    this.status = status;
+  }
+
+  /** Get the task id */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the error string */
+  public String getError() { return error; }
+  /** Get the finish time of the attempt */
+  public long getFinishTime() { return finishTime; }
+  /** Get the task type */
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  /** Get the attempt id due to which the task failed */
+  public TaskAttemptID getFailedAttemptID() {
+    return failedDueToAttempt;
+  }
+  /** Get the task status */
+  public String getTaskStatus() { return status; }
+  /** Get the event type */
+  public EventType getEventType() { return EventType.TASK_FAILED; }
+
+  
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskFinishedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+
+/**
+ * Event to record the successful completion of a task
+ *
+ */
+public class TaskFinishedEvent implements HistoryEvent {
+  private TaskID taskId;
+  private long finishTime;
+  private JhCounters counters;
+  private TaskType taskType;
+  private String status;
+  
+  /**
+   * Create an event to record the successful completion of a task
+   * @param id Task ID
+   * @param finishTime Finish time of the task
+   * @param taskType Type of the task
+   * @param status Status string
+   * @param counters Counters for the task
+   */
+  public TaskFinishedEvent(TaskID id, long finishTime,
+                           TaskType taskType,
+                           String status, Counters counters) {
+    this.taskId = id;
+    this.finishTime = finishTime;
+    this.counters = new JhCounters(counters, "COUNTERS");
+    this.taskType = taskType;
+    this.status = status;
+  }
+
+  /** Get task id */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the task finish time */
+  public long getFinishTime() { return finishTime; }
+  /** Get task counters */
+  public JhCounters getCounters() { return counters; }
+  /** Get task type */
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  /** Get task status */
+  public String getTaskStatus() { return status; }
+  /** Get event type */
+  public EventType getEventType() {
+    return EventType.TASK_FINISHED;
+  }
+
+  
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskStartedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Event to record the start of a task
+ *
+ */
+public class TaskStartedEvent implements HistoryEvent {
+  private TaskID taskId;
+  private String splitLocations;
+  private long startTime;
+  private TaskType taskType;
+
+  /**
+   * Create an event to record start of a task
+   * @param id Task Id
+   * @param startTime Start time of the task
+   * @param taskType Type of the task
+   * @param splitLocations Split locations, applicable for map tasks
+   */
+  public TaskStartedEvent(TaskID id, long startTime, 
+      TaskType taskType, String splitLocations) {
+    this.taskId = id;
+    this.splitLocations = splitLocations;
+    this.startTime = startTime;
+    this.taskType = taskType;
+  }
+
+  /** Get the task id */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the split locations, applicable for map tasks */
+  public String getSplitLocations() { return splitLocations; }
+  /** Get the start time of the task */
+  public long getStartTime() { return startTime; }
+  /** Get the task type */
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.TASK_STARTED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TaskUpdatedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.TaskID;
+
+/**
+ * Event to record updates to a task
+ *
+ */
+public class TaskUpdatedEvent implements HistoryEvent {
+  private TaskID taskId;
+  private long finishTime;
+
+  /**
+   * Create an event to record task updates
+   * @param id Id of the task
+   * @param finishTime Finish time of the task
+   */
+  public TaskUpdatedEvent(TaskID id, long finishTime) {
+    this.taskId = id;
+    this.finishTime = finishTime;
+  }
+
+  /** Get the task ID */
+  public TaskID getTaskId() { return taskId; }
+  /** Get the task finish time */
+  public long getFinishTime() { return finishTime; }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.TASK_UPDATED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/TopologyBuilder.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.StringTokenizer;
+
+/**
+ * Building the cluster topology.
+ */
+public class TopologyBuilder {
+  private Set<ParsedHost> allHosts = new HashSet<ParsedHost>();
+
+  /**
+   * Process one {@link HistoryEvent}
+   * 
+   * @param event
+   *          The {@link HistoryEvent} to be processed.
+   */
+  public void process(HistoryEvent event) {
+    if (event instanceof TaskAttemptFinishedEvent) {
+      processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
+    } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
+      processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
+    } else if (event instanceof TaskStartedEvent) {
+      processTaskStartedEvent((TaskStartedEvent) event);
+    }
+
+    // I do NOT expect these if statements to be exhaustive.
+  }
+
+  /**
+   * Process a collection of JobConf {@link Properties}. We do not restrict it
+   * to be called once.
+   * 
+   * @param conf
+   *          The job conf properties to be added.
+   */
+  public void process(Properties conf) {
+    // no code
+  }
+
+  /**
+   * Request the builder to build the final object. Once called, the
+   * {@link TopologyBuilder} would accept no more events or job-conf properties.
+   * 
+   * @return Parsed {@link LoggedNetworkTopology} object.
+   */
+  public LoggedNetworkTopology build() {
+    return new LoggedNetworkTopology(allHosts);
+  }
+
+  private void processTaskStartedEvent(TaskStartedEvent event) {
+    preferredLocationForSplits(event.getSplitLocations());
+  }
+
+  private void processTaskAttemptUnsuccessfulCompletionEvent(
+      TaskAttemptUnsuccessfulCompletionEvent event) {
+    recordParsedHost(event.getHostname());
+  }
+
+  private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    recordParsedHost(event.getHostname());
+  }
+
+  private void recordParsedHost(String hostName) {
+    ParsedHost result = ParsedHost.parse(hostName);
+
+    if (result != null && !allHosts.contains(result)) {
+      allHosts.add(result);
+    }
+  }
+
+  private void preferredLocationForSplits(String splits) {
+    if (splits != null) {
+      StringTokenizer tok = new StringTokenizer(splits, ",", false);
+
+      while (tok.hasMoreTokens()) {
+        String nextSplit = tok.nextToken();
+
+        recordParsedHost(nextSplit);
+      }
+    }
+  }
+}