You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:23:01 UTC

svn commit: r1077515 [2/4] - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/tools/rumen/ test/tools/data/rumen/small-trace-test/ test/tools/data/rumen/small-trace-test/counters-fo...

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+public interface HistoryEvent {
+  /** Return this event's type. */
+  EventType getEventType();
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/HistoryEventEmitter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counters;
+
+abstract class HistoryEventEmitter {
+  static final private Log LOG = LogFactory.getLog(HistoryEventEmitter.class);
+
+  abstract List<SingleEventEmitter> nonFinalSEEs();
+
+  abstract List<SingleEventEmitter> finalSEEs();
+
+  protected HistoryEventEmitter() {
+    // no code
+  }
+
+  enum PostEmitAction {
+    NONE, REMOVE_HEE
+  };
+
+  final Pair<Queue<HistoryEvent>, PostEmitAction> emitterCore(ParsedLine line,
+      String name) {
+    Queue<HistoryEvent> results = new LinkedList<HistoryEvent>();
+    PostEmitAction removeEmitter = PostEmitAction.NONE;
+    for (SingleEventEmitter see : nonFinalSEEs()) {
+      HistoryEvent event = see.maybeEmitEvent(line, name, this);
+      if (event != null) {
+        results.add(event);
+      }
+    }
+    for (SingleEventEmitter see : finalSEEs()) {
+      HistoryEvent event = see.maybeEmitEvent(line, name, this);
+      if (event != null) {
+        results.add(event);
+        removeEmitter = PostEmitAction.REMOVE_HEE;
+        break;
+      }
+    }
+    return new Pair<Queue<HistoryEvent>, PostEmitAction>(results, removeEmitter);
+  }
+
+  protected static Counters maybeParseCounters(String counters) {
+    try {
+      return parseCounters(counters);
+    } catch (ParseException e) {
+      LOG.warn("The counter string, \"" + counters + "\" is badly formatted.");
+      return null;
+    }
+  }
+
+  protected static Counters parseCounters(String counters)
+      throws ParseException {
+    if (counters == null) {
+      LOG.warn("HistoryEventEmitters: null counter detected:");
+      return null;
+    }
+
+    counters = counters.replace("\\.", "\\\\.");
+    counters = counters.replace("\\\\(", "\\(");
+    counters = counters.replace("\\\\)", "\\)");
+    counters = counters.replace("\\\\[", "\\[");
+    counters = counters.replace("\\\\]", "\\]");
+
+    org.apache.hadoop.mapred.Counters depForm =
+        org.apache.hadoop.mapred.Counters.fromEscapedCompactString(counters);
+
+    return new Counters(depForm);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/InputDemuxer.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@link InputDemuxer} dem-ultiplexes the input files into individual input
+ * streams.
+ */
+public interface InputDemuxer extends Closeable {
+  /**
+   * Bind the {@link InputDemuxer} to a particular file.
+   * 
+   * @param path
+   *          The path to the find it should bind to.
+   * @param conf
+   *          Configuration
+   * @throws IOException
+   * 
+   *           Returns true when the binding succeeds. If the file can be read
+   *           but is in the wrong format, returns false. IOException is
+   *           reserved for read errors.
+   */
+  public void bindTo(Path path, Configuration conf) throws IOException;
+
+  /**
+   * Get the next <name, input> pair. The name should preserve the original job
+   * history file or job conf file name. The input object should be closed
+   * before calling getNext() again. The old input object would be invalid after
+   * calling getNext() again.
+   * 
+   * @return the next <name, input> pair.
+   */
+  public Pair<String, InputStream> getNext() throws IOException;
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+public class JhCounter {
+  public String name;
+  public String displayName;
+  public long value;
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounterGroup.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.List;
+
+public class JhCounterGroup {
+  public String name;
+  public String displayName;
+  public List<JhCounter> counts;
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JhCounters.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+
+public class JhCounters {
+  public String name;
+  public List<JhCounterGroup> groups;
+  
+  JhCounters(Counters counters, String name) {
+    this.name = name;
+    this.groups = new ArrayList<JhCounterGroup>();
+    if (counters == null) return;
+    for (CounterGroup group : counters) {
+      JhCounterGroup g = new JhCounterGroup();
+      g.name = group.getName();
+      g.displayName = group.getDisplayName();
+      g.counts = new ArrayList<JhCounter>(group.size());
+      for (Counter counter : group) {
+        JhCounter c = new JhCounter();
+        c.name = counter.getName();
+        c.displayName = counter.getDisplayName();
+        c.value = counter.getValue();
+        g.counts.add(c);
+      }
+      this.groups.add(g);
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.text.ParseException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+public class Job20LineHistoryEventEmitter extends HistoryEventEmitter {
+
+  static List<SingleEventEmitter> nonFinals =
+      new LinkedList<SingleEventEmitter>();
+  static List<SingleEventEmitter> finals = new LinkedList<SingleEventEmitter>();
+
+  Long originalSubmitTime = null;
+
+  static {
+    nonFinals.add(new JobSubmittedEventEmitter());
+    nonFinals.add(new JobPriorityChangeEventEmitter());
+    nonFinals.add(new JobStatusChangedEventEmitter());
+    nonFinals.add(new JobInitedEventEmitter());
+    nonFinals.add(new JobInfoChangeEventEmitter());
+
+    finals.add(new JobUnsuccessfulCompletionEventEmitter());
+    finals.add(new JobFinishedEventEmitter());
+  }
+
+  Job20LineHistoryEventEmitter() {
+    super();
+  }
+
+  static private class JobSubmittedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      JobID jobID = JobID.forName(jobIDName);
+
+      if (jobIDName == null) {
+        return null;
+      }
+
+      String submitTime = line.get("SUBMIT_TIME");
+      String jobConf = line.get("JOBCONF");
+      String user = line.get("USER");
+      String jobName = line.get("JOBNAME");
+
+      if (submitTime != null) {
+        Job20LineHistoryEventEmitter that =
+            (Job20LineHistoryEventEmitter) thatg;
+
+        that.originalSubmitTime = Long.parseLong(submitTime);
+
+        Map<JobACL, AccessControlList> jobACLs =
+          new HashMap<JobACL, AccessControlList>();
+        return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
+            : user, that.originalSubmitTime, jobConf, jobACLs);
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobPriorityChangeEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      JobID jobID = JobID.forName(jobIDName);
+
+      if (jobIDName == null) {
+        return null;
+      }
+
+      String priority = line.get("JOB_PRIORITY");
+
+      if (priority != null) {
+        return new JobPriorityChangeEvent(jobID, JobPriority.valueOf(priority));
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobInitedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String launchTime = line.get("LAUNCH_TIME");
+      String status = line.get("JOB_STATUS");
+      String totalMaps = line.get("TOTAL_MAPS");
+      String totalReduces = line.get("TOTAL_REDUCES");
+
+      if (launchTime != null && totalMaps != null && totalReduces != null) {
+        return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer
+            .parseInt(totalMaps), Integer.parseInt(totalReduces), status);
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobStatusChangedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String status = line.get("JOB_STATUS");
+
+      if (status != null) {
+        return new JobStatusChangedEvent(jobID, status);
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobInfoChangeEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String launchTime = line.get("LAUNCH_TIME");
+
+      if (launchTime != null) {
+        Job20LineHistoryEventEmitter that =
+            (Job20LineHistoryEventEmitter) thatg;
+        return new JobInfoChangeEvent(jobID, that.originalSubmitTime, Long
+            .parseLong(launchTime));
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobUnsuccessfulCompletionEventEmitter extends
+      SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+
+      String status = line.get("JOB_STATUS");
+
+      String finishedMaps = line.get("FINISHED_MAPS");
+      String finishedReduces = line.get("FINISHED_REDUCES");
+
+      if (status != null && !status.equalsIgnoreCase("success")
+          && finishTime != null && finishedMaps != null
+          && finishedReduces != null) {
+        return new JobUnsuccessfulCompletionEvent(jobID, Long
+            .parseLong(finishTime), Integer.parseInt(finishedMaps), Integer
+            .parseInt(finishedReduces), status);
+      }
+
+      return null;
+    }
+  }
+
+  static private class JobFinishedEventEmitter extends SingleEventEmitter {
+    HistoryEvent maybeEmitEvent(ParsedLine line, String jobIDName,
+        HistoryEventEmitter thatg) {
+      if (jobIDName == null) {
+        return null;
+      }
+
+      JobID jobID = JobID.forName(jobIDName);
+
+      String finishTime = line.get("FINISH_TIME");
+
+      String status = line.get("JOB_STATUS");
+
+      String finishedMaps = line.get("FINISHED_MAPS");
+      String finishedReduces = line.get("FINISHED_REDUCES");
+
+      String failedMaps = line.get("FAILED_MAPS");
+      String failedReduces = line.get("FAILED_REDUCES");
+
+      String counters = line.get("COUNTERS");
+
+      if (status != null && status.equalsIgnoreCase("success")
+          && finishTime != null && finishedMaps != null
+          && finishedReduces != null) {
+        return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
+            .parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
+            .parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
+            maybeParseCounters(counters));
+      }
+
+      return null;
+    }
+  }
+
+  @Override
+  List<SingleEventEmitter> finalSEEs() {
+    return finals;
+  }
+
+  @Override
+  List<SingleEventEmitter> nonFinalSEEs() {
+    return nonFinals;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * {@link JobBuilder} builds one job. It processes a sequence of
+ * {@link HistoryEvent}s.
+ */
+public class JobBuilder {
+  private static final long BYTES_IN_MEG =
+      StringUtils.TraditionalBinaryPrefix.string2long("1m");
+
+  private String jobID;
+
+  private boolean finalized = false;
+
+  private LoggedJob result = new LoggedJob();
+
+  private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
+  private Map<String, LoggedTask> reduceTasks =
+      new HashMap<String, LoggedTask>();
+  private Map<String, LoggedTask> otherTasks =
+      new HashMap<String, LoggedTask>();
+
+  private Map<String, LoggedTaskAttempt> attempts =
+      new HashMap<String, LoggedTaskAttempt>();
+
+  private Map<ParsedHost, ParsedHost> allHosts =
+      new HashMap<ParsedHost, ParsedHost>();
+
+  /**
+   * The number of splits a task can have, before we ignore them all.
+   */
+  private final static int MAXIMUM_PREFERRED_LOCATIONS = 25;
+  /**
+   * The regular expression used to parse task attempt IDs in job tracker logs
+   */
+  private final static Pattern taskAttemptIDPattern =
+      Pattern.compile(".*_([0-9]+)");
+
+  private int[] attemptTimesPercentiles = null;
+
+  // Use this to search within the java options to get heap sizes.
+  // The heap size number is in Capturing Group 1.
+  // The heap size order-of-magnitude suffix is in Capturing Group 2
+  private static final Pattern heapPattern =
+      Pattern.compile("-Xmx([0-9]+[kKmMgGtT])");
+
+  public JobBuilder(String jobID) {
+    this.jobID = jobID;
+  }
+
+  public String getJobID() {
+    return jobID;
+  }
+
+  {
+    if (attemptTimesPercentiles == null) {
+      attemptTimesPercentiles = new int[19];
+
+      for (int i = 0; i < 19; ++i) {
+        attemptTimesPercentiles[i] = (i + 1) * 5;
+      }
+    }
+  }
+
+  /**
+   * Process one {@link HistoryEvent}
+   * 
+   * @param event
+   *          The {@link HistoryEvent} to be processed.
+   */
+  public void process(HistoryEvent event) {
+    if (finalized) {
+      throw new IllegalStateException(
+          "JobBuilder.process(HistoryEvent event) called after LoggedJob built");
+    }
+
+    // these are in lexicographical order by class name.
+    if (event instanceof JobFinishedEvent) {
+      processJobFinishedEvent((JobFinishedEvent) event);
+    } else if (event instanceof JobInfoChangeEvent) {
+      processJobInfoChangeEvent((JobInfoChangeEvent) event);
+    } else if (event instanceof JobInitedEvent) {
+      processJobInitedEvent((JobInitedEvent) event);
+    } else if (event instanceof JobPriorityChangeEvent) {
+      processJobPriorityChangeEvent((JobPriorityChangeEvent) event);
+    } else if (event instanceof JobStatusChangedEvent) {
+      processJobStatusChangedEvent((JobStatusChangedEvent) event);
+    } else if (event instanceof JobSubmittedEvent) {
+      processJobSubmittedEvent((JobSubmittedEvent) event);
+    } else if (event instanceof JobUnsuccessfulCompletionEvent) {
+      processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event);
+    } else if (event instanceof MapAttemptFinishedEvent) {
+      processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event);
+    } else if (event instanceof ReduceAttemptFinishedEvent) {
+      processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event);
+    } else if (event instanceof TaskAttemptFinishedEvent) {
+      processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
+    } else if (event instanceof TaskAttemptStartedEvent) {
+      processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event);
+    } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) {
+      processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event);
+    } else if (event instanceof TaskFailedEvent) {
+      processTaskFailedEvent((TaskFailedEvent) event);
+    } else if (event instanceof TaskFinishedEvent) {
+      processTaskFinishedEvent((TaskFinishedEvent) event);
+    } else if (event instanceof TaskStartedEvent) {
+      processTaskStartedEvent((TaskStartedEvent) event);
+    } else if (event instanceof TaskUpdatedEvent) {
+      processTaskUpdatedEvent((TaskUpdatedEvent) event);
+    } else
+      throw new IllegalArgumentException(
+          "JobBuilder.process(HistoryEvent): unknown event type");
+  }
+
+  private String extract(Properties conf, String[] names, String defaultValue) {
+    for (String name : names) {
+      String result = conf.getProperty(name);
+
+      if (result != null) {
+        return result;
+      }
+    }
+
+    return defaultValue;
+  }
+
+  private Integer extractMegabytes(Properties conf, String[] names) {
+    String javaOptions = extract(conf, names, null);
+
+    if (javaOptions == null) {
+      return null;
+    }
+
+    Matcher matcher = heapPattern.matcher(javaOptions);
+
+    Integer heapMegabytes = null;
+
+    while (matcher.find()) {
+      String heapSize = matcher.group(1);
+      heapMegabytes =
+          ((int) (StringUtils.TraditionalBinaryPrefix.string2long(heapSize) / BYTES_IN_MEG));
+    }
+
+    return heapMegabytes;
+  }
+
+  private void maybeSetHeapMegabytes(Integer megabytes) {
+    if (megabytes != null) {
+      result.setHeapMegabytes(megabytes);
+    }
+  }
+
+  private void maybeSetJobMapMB(Integer megabytes) {
+    if (megabytes != null) {
+      result.setJobMapMB(megabytes);
+    }
+  }
+
+  private void maybeSetJobReduceMB(Integer megabytes) {
+    if (megabytes != null) {
+      result.setJobReduceMB(megabytes);
+    }
+  }
+
+  /**
+   * Process a collection of JobConf {@link Properties}. We do not restrict it
+   * to be called once. It is okay to process a conf before, during or after the
+   * events.
+   * 
+   * @param conf
+   *          The job conf properties to be added.
+   */
+  public void process(Properties conf) {
+    if (finalized) {
+      throw new IllegalStateException(
+          "JobBuilder.process(Properties conf) called after LoggedJob built");
+    }
+
+    result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
+        .getCandidates(), "default"));
+    result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
+        .getCandidates(), null));
+
+    maybeSetHeapMegabytes(extractMegabytes(conf,
+        JobConfPropertyNames.TASK_JAVA_OPTS_S.getCandidates()));
+    maybeSetJobMapMB(extractMegabytes(conf,
+        JobConfPropertyNames.MAP_JAVA_OPTS_S.getCandidates()));
+    maybeSetJobReduceMB(extractMegabytes(conf,
+        JobConfPropertyNames.REDUCE_JAVA_OPTS_S.getCandidates()));
+  }
+
+  /**
+   * Request the builder to build the final object. Once called, the
+   * {@link JobBuilder} would accept no more events or job-conf properties.
+   * 
+   * @return Parsed {@link LoggedJob} object.
+   */
+  public LoggedJob build() {
+    // The main job here is to build CDFs
+    finalized = true;
+
+    // initialize all the per-job statistics gathering places
+    Histogram[] successfulMapAttemptTimes =
+        new Histogram[ParsedHost.numberOfDistances() + 1];
+    for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
+      successfulMapAttemptTimes[i] = new Histogram();
+    }
+
+    Histogram successfulReduceAttemptTimes = new Histogram();
+    Histogram[] failedMapAttemptTimes =
+        new Histogram[ParsedHost.numberOfDistances() + 1];
+    for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
+      failedMapAttemptTimes[i] = new Histogram();
+    }
+    Histogram failedReduceAttemptTimes = new Histogram();
+
+    Histogram successfulNthMapperAttempts = new Histogram();
+    // Histogram successfulNthReducerAttempts = new Histogram();
+    // Histogram mapperLocality = new Histogram();
+
+    for (LoggedTask task : result.getMapTasks()) {
+      for (LoggedTaskAttempt attempt : task.getAttempts()) {
+        int distance = successfulMapAttemptTimes.length - 1;
+        Long runtime = null;
+
+        if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) {
+          runtime = attempt.getFinishTime() - attempt.getStartTime();
+
+          if (attempt.getResult() == Values.SUCCESS) {
+            LoggedLocation host = attempt.getLocation();
+
+            List<LoggedLocation> locs = task.getPreferredLocations();
+
+            if (host != null && locs != null) {
+              for (LoggedLocation loc : locs) {
+                ParsedHost preferedLoc = new ParsedHost(loc);
+
+                distance =
+                    Math.min(distance, preferedLoc
+                        .distance(new ParsedHost(host)));
+              }
+
+              // mapperLocality.enter(distance);
+            }
+
+            if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) {
+              if (runtime != null) {
+                successfulMapAttemptTimes[distance].enter(runtime);
+              }
+            }
+
+            String attemptID = attempt.getAttemptID();
+
+            if (attemptID != null) {
+              Matcher matcher = taskAttemptIDPattern.matcher(attemptID);
+
+              if (matcher.matches()) {
+                String attemptNumberString = matcher.group(1);
+
+                if (attemptNumberString != null) {
+                  int attemptNumber = Integer.parseInt(attemptNumberString);
+
+                  successfulNthMapperAttempts.enter(attemptNumber);
+                }
+              }
+            }
+          } else {
+            if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
+              if (runtime != null) {
+                failedMapAttemptTimes[distance].enter(runtime);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    for (LoggedTask task : result.getReduceTasks()) {
+      for (LoggedTaskAttempt attempt : task.getAttempts()) {
+        Long runtime = attempt.getFinishTime() - attempt.getStartTime();
+
+        if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) {
+          runtime = attempt.getFinishTime() - attempt.getStartTime();
+        }
+        if (attempt.getResult() == Values.SUCCESS) {
+          if (runtime != null) {
+            successfulReduceAttemptTimes.enter(runtime);
+          }
+        } else if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) {
+          failedReduceAttemptTimes.enter(runtime);
+        }
+      }
+    }
+
+    result.setFailedMapAttemptCDFs(mapCDFArrayList(failedMapAttemptTimes));
+
+    LoggedDiscreteCDF failedReduce = new LoggedDiscreteCDF();
+    failedReduce.setCDF(failedReduceAttemptTimes, attemptTimesPercentiles, 100);
+    result.setFailedReduceAttemptCDF(failedReduce);
+
+    result
+        .setSuccessfulMapAttemptCDFs(mapCDFArrayList(successfulMapAttemptTimes));
+
+    LoggedDiscreteCDF succReduce = new LoggedDiscreteCDF();
+    succReduce.setCDF(successfulReduceAttemptTimes, attemptTimesPercentiles,
+        100);
+    result.setSuccessfulReduceAttemptCDF(succReduce);
+
+    result.setFailedMapAttemptCDFs(null);
+
+    long totalSuccessfulAttempts = 0L;
+    long maxTriesToSucceed = 0L;
+
+    for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
+      totalSuccessfulAttempts += ent.getValue();
+      maxTriesToSucceed = Math.max(maxTriesToSucceed, ent.getKey());
+    }
+
+    if (totalSuccessfulAttempts > 0L) {
+      double[] successAfterI = new double[(int) maxTriesToSucceed + 1];
+      for (int i = 0; i < successAfterI.length; ++i) {
+        successAfterI[i] = 0.0D;
+      }
+
+      for (Map.Entry<Long, Long> ent : successfulNthMapperAttempts) {
+        successAfterI[ent.getKey().intValue()] =
+            ((double) ent.getValue()) / totalSuccessfulAttempts;
+      }
+      result.setMapperTriesToSucceed(successAfterI);
+    } else {
+      result.setMapperTriesToSucceed(null);
+    }
+
+    return result;
+  }
+
+  private ArrayList<LoggedDiscreteCDF> mapCDFArrayList(Histogram[] data) {
+    ArrayList<LoggedDiscreteCDF> result = new ArrayList<LoggedDiscreteCDF>();
+
+    for (Histogram hist : data) {
+      LoggedDiscreteCDF discCDF = new LoggedDiscreteCDF();
+      discCDF.setCDF(hist, attemptTimesPercentiles, 100);
+      result.add(discCDF);
+    }
+
+    return result;
+  }
+
+  private static Values getPre21Value(String name) {
+    if (name.equalsIgnoreCase("JOB_CLEANUP")) {
+      return Values.CLEANUP;
+    }
+    if (name.equalsIgnoreCase("JOB_SETUP")) {
+      return Values.SETUP;
+    }
+
+    return Values.valueOf(name.toUpperCase());
+  }
+
+  private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
+    LoggedTask task = getTask(event.getTaskId().toString());
+    if (task == null) {
+      return;
+    }
+    task.setFinishTime(event.getFinishTime());
+  }
+
+  private void processTaskStartedEvent(TaskStartedEvent event) {
+    LoggedTask task =
+        getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
+    task.setStartTime(event.getStartTime());
+    task.setPreferredLocations(preferredLocationForSplits(event
+        .getSplitLocations()));
+  }
+
+  private void processTaskFinishedEvent(TaskFinishedEvent event) {
+    LoggedTask task =
+        getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
+    if (task == null) {
+      return;
+    }
+    task.setFinishTime(event.getFinishTime());
+    task.setTaskStatus(getPre21Value(event.getTaskStatus()));
+    task.incorporateCounters(event.getCounters());
+  }
+
+  private void processTaskFailedEvent(TaskFailedEvent event) {
+    LoggedTask task =
+        getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
+    if (task == null) {
+      return;
+    }
+    task.setFinishTime(event.getFinishTime());
+    task.setTaskStatus(getPre21Value(event.getTaskStatus()));
+  }
+
+  private void processTaskAttemptUnsuccessfulCompletionEvent(
+      TaskAttemptUnsuccessfulCompletionEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getTaskAttemptId().toString());
+
+    if (attempt == null) {
+      return;
+    }
+
+    attempt.setResult(getPre21Value(event.getTaskStatus()));
+    ParsedHost parsedHost = getAndRecordParsedHost(event.getHostname());
+
+    if (parsedHost != null) {
+      attempt.setLocation(parsedHost.makeLoggedLocation());
+    }
+
+    attempt.setFinishTime(event.getFinishTime());
+  }
+
+  private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getTaskAttemptId().toString());
+    if (attempt == null) {
+      return;
+    }
+    attempt.setStartTime(event.getStartTime());
+  }
+
+  private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getAttemptId().toString());
+    if (attempt == null) {
+      return;
+    }
+    attempt.setResult(getPre21Value(event.getTaskStatus()));
+    attempt.setLocation(getAndRecordParsedHost(event.getHostname())
+        .makeLoggedLocation());
+    attempt.setFinishTime(event.getFinishTime());
+    attempt
+        .incorporateCounters(event.getCounters());
+  }
+
+  private void processReduceAttemptFinishedEvent(
+      ReduceAttemptFinishedEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getAttemptId().toString());
+    if (attempt == null) {
+      return;
+    }
+    attempt.setResult(getPre21Value(event.getTaskStatus()));
+    attempt.setHostName(event.getHostname());
+    // XXX There may be redundant location info available in the event.
+    // We might consider extracting it from this event. Currently this
+    // is redundant, but making this will add future-proofing.
+    attempt.setFinishTime(event.getFinishTime());
+    attempt.setShuffleFinished(event.getShuffleFinishTime());
+    attempt.setSortFinished(event.getSortFinishTime());
+    attempt
+        .incorporateCounters(event.getCounters());
+  }
+
+  private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
+    LoggedTaskAttempt attempt =
+        getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
+            event.getAttemptId().toString());
+    if (attempt == null) {
+      return;
+    }
+    attempt.setResult(getPre21Value(event.getTaskStatus()));
+    attempt.setHostName(event.getHostname());
+    // XXX There may be redundant location info available in the event.
+    // We might consider extracting it from this event. Currently this
+    // is redundant, but making this will add future-proofing.
+    attempt.setFinishTime(event.getFinishTime());
+    attempt
+        .incorporateCounters(event.getCounters());
+  }
+
+  private void processJobUnsuccessfulCompletionEvent(
+      JobUnsuccessfulCompletionEvent event) {
+    result.setOutcome(Pre21JobHistoryConstants.Values
+        .valueOf(event.getStatus()));
+    result.setFinishTime(event.getFinishTime());
+  }
+
+  private void processJobSubmittedEvent(JobSubmittedEvent event) {
+    result.setJobID(event.getJobId().toString());
+    result.setJobName(event.getJobName());
+    result.setUser(event.getUserName());
+    result.setSubmitTime(event.getSubmitTime());
+  }
+
+  private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
+    result.setOutcome(Pre21JobHistoryConstants.Values
+        .valueOf(event.getStatus()));
+  }
+
+  private void processJobPriorityChangeEvent(JobPriorityChangeEvent event) {
+    result.setPriority(LoggedJob.JobPriority.valueOf(event.getPriority()
+        .toString()));
+  }
+
+  private void processJobInitedEvent(JobInitedEvent event) {
+    result.setLaunchTime(event.getLaunchTime());
+    result.setTotalMaps(event.getTotalMaps());
+    result.setTotalReduces(event.getTotalReduces());
+  }
+
+  private void processJobInfoChangeEvent(JobInfoChangeEvent event) {
+    result.setLaunchTime(event.getLaunchTime());
+  }
+
+  private void processJobFinishedEvent(JobFinishedEvent event) {
+    result.setFinishTime(event.getFinishTime());
+    result.setJobID(jobID);
+    result.setOutcome(Values.SUCCESS);
+  }
+
+  private LoggedTask getTask(String taskIDname) {
+    LoggedTask result = mapTasks.get(taskIDname);
+
+    if (result != null) {
+      return result;
+    }
+
+    result = reduceTasks.get(taskIDname);
+
+    if (result != null) {
+      return result;
+    }
+
+    return otherTasks.get(taskIDname);
+  }
+
+  /**
+   * @param type
+   *          the task type
+   * @param taskIDname
+   *          the task ID name, as a string
+   * @param allowCreate
+   *          if true, we can create a task.
+   * @return
+   */
+  private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
+      boolean allowCreate) {
+    Map<String, LoggedTask> taskMap = otherTasks;
+    List<LoggedTask> tasks = this.result.getOtherTasks();
+
+    switch (type) {
+    case MAP:
+      taskMap = mapTasks;
+      tasks = this.result.getMapTasks();
+      break;
+
+    case REDUCE:
+      taskMap = reduceTasks;
+      tasks = this.result.getReduceTasks();
+      break;
+
+    default:
+      // no code
+    }
+
+    LoggedTask result = taskMap.get(taskIDname);
+
+    if (result == null && allowCreate) {
+      result = new LoggedTask();
+      result.setTaskType(getPre21Value(type.toString()));
+      result.setTaskID(taskIDname);
+      taskMap.put(taskIDname, result);
+      tasks.add(result);
+    }
+
+    return result;
+  }
+
+  private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
+      String taskIDName, String taskAttemptName) {
+    LoggedTask task = getOrMakeTask(type, taskIDName, false);
+    LoggedTaskAttempt result = attempts.get(taskAttemptName);
+
+    if (result == null && task != null) {
+      result = new LoggedTaskAttempt();
+      result.setAttemptID(taskAttemptName);
+      attempts.put(taskAttemptName, result);
+      task.getAttempts().add(result);
+    }
+
+    return result;
+  }
+
+  private ParsedHost getAndRecordParsedHost(String hostName) {
+    ParsedHost result = ParsedHost.parse(hostName);
+
+    if (result != null) {
+      ParsedHost canonicalResult = allHosts.get(result);
+
+      if (canonicalResult != null) {
+        return canonicalResult;
+      }
+
+      allHosts.put(result, result);
+
+      return result;
+    }
+
+    return null;
+  }
+
+  private ArrayList<LoggedLocation> preferredLocationForSplits(String splits) {
+    if (splits != null) {
+      ArrayList<LoggedLocation> locations = null;
+
+      StringTokenizer tok = new StringTokenizer(splits, ",", false);
+
+      if (tok.countTokens() <= MAXIMUM_PREFERRED_LOCATIONS) {
+        locations = new ArrayList<LoggedLocation>();
+
+        while (tok.hasMoreTokens()) {
+          String nextSplit = tok.nextToken();
+
+          ParsedHost node = getAndRecordParsedHost(nextSplit);
+
+          if (locations != null && node != null) {
+            locations.add(node.makeLoggedLocation());
+          }
+        }
+
+        return locations;
+      }
+    }
+
+    return null;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapred.JobConf;
+
+public enum JobConfPropertyNames {
+  QUEUE_NAMES("mapred.job.queue.name"),
+  JOB_NAMES("mapred.job.name"),
+  TASK_JAVA_OPTS_S(JobConf.MAPRED_TASK_JAVA_OPTS),
+  MAP_JAVA_OPTS_S(JobConf.MAPRED_TASK_JAVA_OPTS, 
+      JobConf.MAPRED_MAP_TASK_JAVA_OPTS),
+  REDUCE_JAVA_OPTS_S(JobConf.MAPRED_TASK_JAVA_OPTS, 
+      JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS);
+
+  private String[] candidates;
+
+  JobConfPropertyNames(String... candidates) {
+    this.candidates = candidates;
+  }
+
+  public String[] getCandidates() {
+    return candidates;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Node;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+/**
+ * {@link JobConfigurationParser} parses the job configuration xml file, and
+ * extracts various framework specific properties. It parses the file using a
+ * stream-parser and thus is more memory efficient. [This optimization may be
+ * postponed for a future release]
+ */
+public class JobConfigurationParser {
+  final private Set<String> interested;
+
+  /**
+   * Constructor
+   * 
+   * @param interested
+   *          properties we should extract from the job configuration xml.
+   */
+  public JobConfigurationParser(List<String> interested) {
+    this.interested = new HashSet<String>(interested);
+  }
+
+  /**
+   * Parse the job configuration file (as an input stream) and return a
+   * {@link Properties} collection. The input stream will not be closed after
+   * return from the call.
+   * 
+   * @param input
+   *          The input data.
+   * @return A {@link Properties} collection extracted from the job
+   *         configuration xml.
+   * @throws IOException
+   */
+  Properties parse(InputStream input) throws IOException {
+    Properties result = new Properties();
+
+    try {
+      DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+
+      DocumentBuilder db = dbf.newDocumentBuilder();
+
+      Document doc = db.parse(input);
+
+      Element root = doc.getDocumentElement();
+
+      if (!"configuration".equals(root.getTagName())) {
+        System.out.print("root is not a configuration node");
+        return null;
+      }
+
+      NodeList props = root.getChildNodes();
+
+      for (int i = 0; i < props.getLength(); ++i) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element))
+          continue;
+        Element prop = (Element) propNode;
+        if (!"property".equals(prop.getTagName())) {
+          System.out.print("bad conf file: element not <property>");
+        }
+        NodeList fields = prop.getChildNodes();
+        String attr = null;
+        String value = null;
+        @SuppressWarnings("unused")
+        boolean finalParameter = false;
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element)) {
+            continue;
+          }
+
+          Element field = (Element) fieldNode;
+          if ("name".equals(field.getTagName()) && field.hasChildNodes()) {
+            attr = ((Text) field.getFirstChild()).getData().trim();
+          }
+          if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+            value = ((Text) field.getFirstChild()).getData();
+          }
+          if ("final".equals(field.getTagName()) && field.hasChildNodes()) {
+            finalParameter =
+                "true".equals(((Text) field.getFirstChild()).getData());
+          }
+        }
+
+        if (interested.contains(attr) && value != null) {
+          result.put(attr, value);
+        }
+      }
+    } catch (ParserConfigurationException e) {
+      return null;
+    } catch (SAXException e) {
+      return null;
+    }
+
+    return result;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobFinishedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record successful completion of job
+ *
+ */
+public class JobFinishedEvent  implements HistoryEvent {
+  private JobID jobId;
+  private long finishTime;
+  private int finishedMaps;
+  private int finishedReduces;
+  private int failedMaps;
+  private int failedReduces;
+  private Counters mapCounters;
+  private Counters reduceCounters;
+  private Counters totalCounters;
+
+  /** 
+   * Create an event to record successful job completion
+   * @param id Job ID
+   * @param finishTime Finish time of the job
+   * @param finishedMaps The number of finished maps
+   * @param finishedReduces The number of finished reduces
+   * @param failedMaps The number of failed maps
+   * @param failedReduces The number of failed reduces
+   * @param mapCounters Map Counters for the job
+   * @param reduceCounters Reduce Counters for the job
+   * @param totalCounters Total Counters for the job
+   */
+  public JobFinishedEvent(JobID id, long finishTime,
+      int finishedMaps, int finishedReduces,
+      int failedMaps, int failedReduces,
+      Counters mapCounters, Counters reduceCounters,
+      Counters totalCounters) {
+    this.jobId = id;
+    this.finishTime = finishTime;
+    this.finishedMaps = finishedMaps;
+    this.finishedReduces = finishedReduces;
+    this.failedMaps = failedMaps;
+    this.failedReduces = failedReduces;
+    this.mapCounters = mapCounters;
+    this.reduceCounters = reduceCounters;
+    this.totalCounters = totalCounters;
+  }
+
+  public EventType getEventType() {
+    return EventType.JOB_FINISHED;
+  }
+
+  /** Get the Job ID */
+  public JobID getJobid() { return jobId; }
+  /** Get the job finish time */
+  public long getFinishTime() { return finishTime; }
+  /** Get the number of finished maps for the job */
+  public int getFinishedMaps() { return finishedMaps; }
+  /** Get the number of finished reducers for the job */
+  public int getFinishedReduces() { return finishedReduces; }
+  /** Get the number of failed maps for the job */
+  public int getFailedMaps() { return failedMaps; }
+  /** Get the number of failed reducers for the job */
+  public int getFailedReduces() { return failedReduces; }
+  /** Get the counters for the job */
+  public Counters getTotalCounters() {
+    return totalCounters;
+  }
+  /** Get the Map counters for the job */
+  public Counters getMapCounters() {
+    return mapCounters;
+  }
+  /** Get the reduce counters for the job */
+  public Counters getReduceCounters() {
+    return reduceCounters;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * {@link JobHistoryParser} defines the interface of a Job History file parser.
+ */
+public interface JobHistoryParser extends Closeable {
+  /**
+   * Get the next {@link HistoryEvent}
+   * @return the next {@link HistoryEvent}. If no more events left, return null.
+   * @throws IOException 
+   */
+  HistoryEvent nextEvent() throws IOException;
+  
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * {@link JobHistoryParserFactory} is a singleton class that attempts to
+ * determine the version of job history and return a proper parser.
+ */
+public class JobHistoryParserFactory {
+  public static JobHistoryParser getParser(RewindableInputStream ris)
+      throws IOException {
+    for (VersionDetector vd : VersionDetector.values()) {
+      boolean canParse = vd.canParse(ris);
+      ris.rewind();
+      if (canParse) {
+        return vd.newInstance(ris);
+      }
+    }
+
+    throw new IOException("No suitable parser.");
+  }
+
+  enum VersionDetector {
+    Hadoop20() {
+
+      @Override
+      public boolean canParse(InputStream input) throws IOException {
+        return Hadoop20JHParser.canParse(input);
+      }
+
+      @Override
+      public JobHistoryParser newInstance(InputStream input) throws IOException {
+        return new Hadoop20JHParser(input);
+      }
+    };
+
+    abstract JobHistoryParser newInstance(InputStream input) throws IOException;
+
+    abstract boolean canParse(InputStream input) throws IOException;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInfoChangeEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record changes in the submit and launch time of
+ * a job
+ */
+public class JobInfoChangeEvent implements HistoryEvent {
+  private JobID jobId;
+  private long submitTime;
+  private long launchTime;
+
+  /** 
+   * Create a event to record the submit and launch time of a job
+   * @param id Job Id 
+   * @param submitTime Submit time of the job
+   * @param launchTime Launch time of the job
+   */
+  public JobInfoChangeEvent(JobID id, long submitTime, long launchTime) {
+    this.jobId = id;
+    this.submitTime = submitTime;
+    this.launchTime = launchTime;
+  }
+
+  /** Get the Job ID */
+  public JobID getJobId() { return jobId; }
+  /** Get the Job submit time */
+  public long getSubmitTime() { return submitTime; }
+  /** Get the Job launch time */
+  public long getLaunchTime() { return launchTime; }
+
+  public EventType getEventType() {
+    return EventType.JOB_INFO_CHANGED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobInitedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record the initialization of a job
+ *
+ */
+public class JobInitedEvent implements HistoryEvent {
+  private JobID jobId;
+  private long launchTime;
+  private int totalMaps;
+  private int totalReduces;
+  private String jobStatus;
+
+  /**
+   * Create an event to record job initialization
+   * @param id
+   * @param launchTime
+   * @param totalMaps
+   * @param totalReduces
+   * @param jobStatus
+   */
+  public JobInitedEvent(JobID id, long launchTime, int totalMaps,
+                        int totalReduces, String jobStatus) {
+    this.jobId = id;
+    this.launchTime = launchTime;
+    this.totalMaps = totalMaps;
+    this.totalReduces = totalReduces;
+    this.jobStatus = jobStatus;
+  }
+
+  /** Get the job ID */
+  public JobID getJobId() { return jobId; }
+  /** Get the launch time */
+  public long getLaunchTime() { return launchTime; }
+  /** Get the total number of maps */
+  public int getTotalMaps() { return totalMaps; }
+  /** Get the total number of reduces */
+  public int getTotalReduces() { return totalReduces; }
+  /** Get the status */
+  public String getStatus() { return jobStatus; }
+ /** Get the event type */
+  public EventType getEventType() {
+    return EventType.JOB_INITED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobPriorityChangeEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record the change of priority of a job
+ *
+ */
+public class JobPriorityChangeEvent implements HistoryEvent {
+  private JobID jobId;
+  private JobPriority priority;
+
+  /** Generate an event to record changes in Job priority
+   * @param id Job Id
+   * @param priority The new priority of the job
+   */
+  public JobPriorityChangeEvent(JobID id, JobPriority priority) {
+    this.jobId = id;
+    this.priority = priority;
+  }
+
+  /** Get the Job ID */
+  public JobID getJobId() { return jobId; }
+  /** Get the job priority */
+  public JobPriority getPriority() {
+    return priority;
+  }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.JOB_PRIORITY_CHANGED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobStatusChangedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record the change of status for a job
+ *
+ */
+public class JobStatusChangedEvent implements HistoryEvent {
+  private JobID jobId;
+  private String jobStatus;
+
+  /**
+   * Create an event to record the change in the Job Status
+   * @param id Job ID
+   * @param jobStatus The new job status
+   */
+  public JobStatusChangedEvent(JobID id, String jobStatus) {
+    this.jobId = id;
+    this.jobStatus = jobStatus;
+  }
+
+  /** Get the Job Id */
+  public JobID getJobId() { return jobId; }
+  /** Get the event status */
+  public String getStatus() { return jobStatus; }
+  /** Get the event type */
+  public EventType getEventType() {
+    return EventType.JOB_STATUS_CHANGED;
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * Event to record the submission of a job
+ *
+ */
+public class JobSubmittedEvent implements HistoryEvent {
+  private JobID jobId;
+  private String jobName;
+  private String userName;
+  private long submitTime;
+  private String jobConfPath;
+  private Map<JobACL, AccessControlList> jobAcls;
+
+  /**
+   * @deprecated Use
+   *             {@link #JobSubmittedEvent(JobID, String, String, long, String, Map)}
+   *             instead.
+   */
+  @Deprecated
+  public JobSubmittedEvent(JobID id, String jobName, String userName,
+      long submitTime, String jobConfPath) {
+    this(id, jobName, userName, submitTime, jobConfPath,
+        new HashMap<JobACL, AccessControlList>());
+  }
+
+  /**
+   * Create an event to record job submission
+   * @param id The job Id of the job
+   * @param jobName Name of the job
+   * @param userName Name of the user who submitted the job
+   * @param submitTime Time of submission
+   * @param jobConfPath Path of the Job Configuration file
+   * @param jobACLs The configured acls for the job.
+   */
+  public JobSubmittedEvent(JobID id, String jobName, String userName,
+      long submitTime, String jobConfPath,
+      Map<JobACL, AccessControlList> jobACLs) {
+    this.jobId = id;
+    this.jobName = jobName;
+    this.userName = userName;
+    this.submitTime = submitTime;
+    this.jobConfPath = jobConfPath;
+    this.jobAcls = jobACLs;
+  }
+
+  /** Get the Job Id */
+  public JobID getJobId() { return jobId; }
+  /** Get the Job name */
+  public String getJobName() { return jobName; }
+  /** Get the user name */
+  public String getUserName() { return userName; }
+  /** Get the submit time */
+  public long getSubmitTime() { return submitTime; }
+  /** Get the Path for the Job Configuration file */
+  public String getJobConfPath() { return jobConfPath; }
+  /** Get the acls configured for the job **/
+  public Map<JobACL, AccessControlList> getJobAcls() {
+    return jobAcls;
+  }
+  
+  /** Get the event type */
+  public EventType getEventType() { return EventType.JOB_SUBMITTED; }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JobUnsuccessfulCompletionEvent.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * Event to record Failed and Killed completion of jobs
+ *
+ */
+public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
+  private JobID jobId;
+  private long finishTime;
+  private int finishedMaps;
+  private int finishedReduces;
+  private String jobStatus;
+
+  /**
+   * Create an event to record unsuccessful completion (killed/failed) of jobs
+   * @param id Job ID
+   * @param finishTime Finish time of the job
+   * @param finishedMaps Number of finished maps
+   * @param finishedReduces Number of finished reduces
+   * @param status Status of the job
+   */
+  public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
+      int finishedMaps,
+      int finishedReduces, String status) {
+    this.jobId = id;
+    this.finishTime = finishTime;
+    this.finishedMaps = finishedMaps;
+    this.finishedReduces = finishedReduces;
+    this.jobStatus = status;
+  }
+
+  /** Get the Job ID */
+  public JobID getJobId() { return jobId; }
+  /** Get the job finish time */
+  public long getFinishTime() { return finishTime; }
+  /** Get the number of finished maps */
+  public int getFinishedMaps() { return finishedMaps; }
+  /** Get the number of finished reduces */
+  public int getFinishedReduces() { return finishedReduces; }
+  /** Get the status */
+  public String getStatus() { return jobStatus; }
+  /** Get the event type */
+  public EventType getEventType() {
+    if ("FAILED".equals(getStatus())) {
+      return EventType.JOB_FAILED;
+    } else
+      return EventType.JOB_KILLED;
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java Fri Mar  4 04:22:59 2011
@@ -23,31 +23,26 @@ import java.io.IOException;
 import java.io.InputStream;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.Decompressor;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 
 /**
  * A simple wrapper for parsing JSON-encoded data using ObjectMapper.
- * @param <T> The (base) type of the object(s) to be parsed by this parser.
+ * 
+ * @param <T>
+ *          The (base) type of the object(s) to be parsed by this parser.
  */
 class JsonObjectMapperParser<T> implements Closeable {
   private final ObjectMapper mapper;
   private final Class<? extends T> clazz;
   private final JsonParser jsonParser;
-  private final Decompressor decompressor;
 
   /**
    * Constructor.
    * 
-   * @param path 
+   * @param path
    *          Path to the JSON data file, possibly compressed.
    * @param conf
    * @throws IOException
@@ -58,17 +53,7 @@ class JsonObjectMapperParser<T> implemen
     mapper.configure(
         DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
     this.clazz = clazz;
-    FileSystem fs = path.getFileSystem(conf);
-    CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path);
-    InputStream input;
-    if (codec == null) {
-      input = fs.open(path);
-      decompressor = null;
-    } else {
-      FSDataInputStream fsdis = fs.open(path);
-      decompressor = CodecPool.getDecompressor(codec);
-      input = codec.createInputStream(fsdis, decompressor);
-    }
+    InputStream input = new PossiblyDecompressedInputStream(path, conf);
     jsonParser = mapper.getJsonFactory().createJsonParser(input);
   }
 
@@ -84,7 +69,6 @@ class JsonObjectMapperParser<T> implemen
     mapper.configure(
         DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
     this.clazz = clazz;
-    decompressor = null;
     jsonParser = mapper.getJsonFactory().createJsonParser(input);
   }
 
@@ -105,12 +89,6 @@ class JsonObjectMapperParser<T> implemen
 
   @Override
   public void close() throws IOException {
-    try {
-      jsonParser.close();
-    } finally {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-      }
-    }
+    jsonParser.close();
   }
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java?rev=1077515&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java Fri Mar  4 04:22:59 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.codehaus.jackson.JsonEncoding;
+import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+
+/**
+ * Simple wrapper around {@link JsonGenerator} to write objects in JSON format.
+ * @param <T> The type of the objects to be written.
+ */
+public class JsonObjectMapperWriter<T> implements Closeable {
+  private JsonGenerator writer;
+  
+  public JsonObjectMapperWriter(OutputStream output, boolean prettyPrint) throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(
+        SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true);
+    mapper.getJsonFactory();
+    writer = mapper.getJsonFactory().createJsonGenerator(
+        output, JsonEncoding.UTF8);
+    if (prettyPrint) {
+      writer.useDefaultPrettyPrinter();
+    }
+  }
+  
+  public void write(T object) throws IOException {
+    writer.writeObject(object);
+  }
+  
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Fri Mar  4 04:22:59 2011
@@ -102,6 +102,24 @@ public class LoggedJob implements DeepCo
     setJobID(jobID);
   }
 
+  void adjustTimes(long adjustment) {
+    submitTime += adjustment;
+    launchTime += adjustment;
+    finishTime += adjustment;
+
+    for (LoggedTask task : mapTasks) {
+      task.adjustTimes(adjustment);
+    }
+
+    for (LoggedTask task : reduceTasks) {
+      task.adjustTimes(adjustment);
+    }
+
+    for (LoggedTask task : otherTasks) {
+      task.adjustTimes(adjustment);
+    }
+  }
+
   @SuppressWarnings("unused")
   // for input parameter ignored.
   @JsonAnySetter

Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=1077515&r1=1077514&r2=1077515&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Fri Mar  4 04:22:59 2011
@@ -82,7 +82,7 @@ public class LoggedNetworkTopology imple
    * @param level
    *          the level number
    */
-  LoggedNetworkTopology(HashSet<ParsedHost> hosts, String name, int level) {
+  LoggedNetworkTopology(Set<ParsedHost> hosts, String name, int level) {
 
     this.name = name;
     this.children = null;
@@ -119,6 +119,10 @@ public class LoggedNetworkTopology imple
     }
   }
 
+  LoggedNetworkTopology(Set<ParsedHost> hosts) {
+    this(hosts, "<root>", 0);
+  }
+
   public String getName() {
     return name;
   }